Hi,

Thanks for your remarks

L1. I would say "who can do the most can do the least", even though most people 
will fail and stop, we found it interesting to offer the possibility to 
fail-and-send-to-DLQ

L2: We did not consider extending the TimestampExtractor because we estimate it 
out of scope for this KIP. Perhaps it will be possible to include it in an 
ExceptionHandler later.

L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ 
topic can be different in each custom Exception Handler:

When providing custom handlers, users would have the possibility to return:
 * FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

cheers !
Sébastien


________________________________
De : Lucas Brutschy <lbruts...@confluent.io.INVALID>
Envoyé : lundi 22 avril 2024 14:36
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.



On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> wrote:
>
> Hi everyone,
>
> Following all the discussion on this KIP and KIP-1033, we introduced a
> new container class containing only processing context metadata:
> ProcessingMetadata. This new container class is actually part of
> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> think it's the wisest implementation wise.
>
> I also clarified the interface of the enums:
> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[],
> byte[]>> deadLetterQueueRecords) . Very likely most users would just
> send one DLQ record, but there might be specific use-cases and what
> can do more can do less, so I added an Iterable.
>
> I took some time to think about the impact of storing the
> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> the topic/offset/partition should be fine, but I am concerned about
> storing the rawSourceKey/Value. I think it could impact some specific
> use-cases, for example, a high-throughput Kafka Streams application
> "counting" messages could have huge source input messages, and very
> small sink messages, here, I assume storing the rawSourceKey/Value
> could significantly require more memory than the actual Kafka Producer
> buffer.
>
> I think the safest approach is actually to only store the fixed-size
> metadata for the ProductionExceptionHandler.handle:
> topic/partition/offset/processorNodeId/taskId, it might be confusing
> for the user, but 1) it is still better than nowaday where there are
> no context information at all, 2) it would be clearly stated in the
> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> punctuate case). .
>
> Do you think it would be a suitable design Sophie?
>
> Cheers,
> Damien
>
> On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> 
> wrote:
> >
> > Hi Sophie,
> >
> > Thanks for your feedback.
> > Completing the Damien's comments here for points S1 and S5B.
> >
> > S1:
> > > I'm confused -- are you saying that we're introducing a new kind of 
> > > ProducerRecord class for this?
> >
> > I am wondering if it makes sense to alter the ProducerRecord from Clients 
> > API with a "deadLetterQueueTopicName" attribute dedicated to Kafka Streams 
> > DLQ.
> > Adding "deadLetterQueueTopicName" as an additional parameter to 
> > "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> > records to different DLQ topics depending on conditions:
> > @Override
> > public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> > context,
> > ProducerRecord<byte[], byte[]> record,
> > Exception exception) {
> > if (condition1) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-a");
> > }
> > if (condition2) {
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-b");
> > }
> > return ProductionExceptionHandlerResponse.CONTINUE
> > .withDeadLetterQueueRecord(record, "dlq-topic-c");
> > }
> >
> > S5B:
> > > I was having a bit of trouble understanding what the behavior would be if 
> > > someone configured a "errors.deadletterqueue.topic.name" but didn't 
> > > implement the handlers.
> >
> > The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
> > DefaultProductionExceptionHandler should be able to tell if records should 
> > be sent to DLQ or not.
> > The "errors.deadletterqueue.topic.name" takes place to:
> >
> > * Specifying if the provided handlers should or should not send records to 
> > DLQ.
> > * If the value is empty, the handlers should not send records to DLQ.
> > * If the value is not empty, the handlers should send records to DLQ.
> > * Define the name of the DLQ topic that should be used by the provided 
> > handlers.
> >
> > Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> > handlers should return either:
> >
> > * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
> > * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> > If "errors.deadletterqueue.topic.name" is defined but neither 
> > DeserializationExceptionHandler nor ProductionExceptionHandler classes are 
> > defined in the configuration, then nothing should happen as sending to DLQ 
> > is based on handlers’ response.
> > When providing custom handlers, users would have the possibility to return:
> >
> > * FAIL
> > * CONTINUE
> > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >
> > A DLQ topic name is currently required using the two last response types.
> > I am wondering if it could benefit users to ease the use of the default DLQ 
> > topic "errors.deadletterqueue.topic.name" when implementing custom 
> > handlers, with such kind of implementation:
> >
> > * FAIL.withDefaultDeadLetterQueueRecord(record)
> > * CONTINUE.withDefaultDeadLetterQueueRecord(record)
> >
> > Regards,
> > Loïc
> >
> > De : Damien Gasparina <d.gaspar...@gmail.com>
> > Envoyé : dimanche 14 avril 2024 20:24
> > À : dev@kafka.apache.org
> > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >
> > Warning External sender Do not click on any links or open any attachments 
> > unless you trust the sender and know the content is safe.
> >
> > Hi Sophie,
> >
> > Thanks a lot for your feedback and your detailed comments.
> >
> > S1.
> > > I'm confused -- are you saying that we're introducing a new kind of
> > ProducerRecord class for this?
> >
> > Sorry for the poor wording, that's not what I meant. While writing the
> > KIP, I was hesitating between 1. leveraging the Kafka Producer
> > ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in
> > a separate parameter, 3. a new custom interface (e.g.
> > DeadLetterQueueRecord).
> > As the KafkaProducer ProducerRecord is not used in the Kafka Streams
> > API (except ProductionExceptionHandler) and I would like to avoid a
> > new interface if not strictly required, I leaned toward option 2.
> > Thinking about it, maybe option 1. would be best, but I assume it
> > could create confusion with KafkaStreams ProducerRecord. Let me sleep
> > on it.
> >
> > S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and
> > your point in S4, it seems more and more likely that we will create a
> > new container class containing only the metadata for the exception
> > handlers. To be consistent, I think we should use this new
> > implementation in all exception handlers.
> > The only issue I could think off is that the new interface would
> > expose less data than the current ProcessorContext in the
> > DeserializationException(e.g. stateDir(), metrics(), getStateStore()),
> > thus it could be hard for some users to migrate to the new interface.
> > I do expect that only a few users would be impacted as the javadoc is
> > very clear: `Note, that the passed in {@link ProcessorContext} only
> > allows access to metadata like the task ID.`
> >
> > S3. I completely agree with you, it is something that might not be
> > trivial and should be thoroughly covered by unit tests during the
> > implementation.
> >
> > S4. Good point, I did not notice that the ProductionExceptionHandler
> > is also invoked in the producer.send() callback.
> > Capturing the ProcessingContext for each in-flight message is probably
> > not possible. I think there is no other way to write a custom
> > container class holding only the metadata that are essentials, I am
> > thinking of storing the following attributes: source topic, partition,
> > offset, rawKey, rawValue and taskId.
> > Those metadata should be relatively small, but I assume that there
> > could be a high number of in-flight messages, especially with at least
> > once processing guarantee. Do you think it would be fine memory wise?
> >
> > S5. As many exceptions are only accessible in exception handlers, and
> > we wanted to 1) allow users to customize the DLQ records and 2) have a
> > suitable DLQ out of the box implementation, we felt it natural to rely
> > on exception handlers, that's also why we created KIP-1033.
> > Piggybacking on the enum response was the cleanest way we could think
> > off, but we are completely open to suggestions.
> >
> > S5a. Completely agree with you on this point, for this DLQ approach to
> > be complete, the ProcessingExceptionHandler introduced in KIP-1033 is
> > required. KIP-1033 is definitely our first priority. We decided to
> > kick-off the KIP-1034 discussion as we expected the discussions to be
> > dynamic and could potentially impact some choices of KIP-1033.
> >
> > S5b. In this KIP, we wanted to 1. provide as much flexibility to the
> > user as possible; 2. provide a good default implementation
> > for the DLQ without having to write custom exception handlers.
> > For the default implementation, we introduced a new configuration:
> > errors.deadletterqueue.topic.name.
> >
> > If this configuration is set, it changes the behavior of the provided
> > exception handlers to return a DLQ record containing the raw key/value
> > + headers + exception metadata in headers.
> > If the out of the box implementation is not suitable for a user, e.g.
> > the payload needs to be masked in the DLQ, it could implement their
> > own exception handlers. The errors.deadletterqueue.topic.name would
> > only impact Kafka Streams bundled exception handlers (e.g.
> > org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler)
> >
> > Let me update the KIP to make it clear and also provide examples.
> >
> > S6/S7. Good point, mea culpa for the camel case, it must have been a
> > sugar rush :)
> >
> > Thanks again for your detailed comments and pointing out S4
> > (production exception & Processing Context)!
> >
> > Cheers,
> > Damien
> > This email was screened for spam and malicious content but exercise caution 
> > anyway.
> >
> >
> >
> >
> > On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman 
> > <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote:
> > >
> > > Thanks for the KIP, this will make a lot of people very happy.
> > >
> > > Wanted to chime in on a few points that have been raised so far and add
> > > some of my own (numbering with an S to distinguish my points from the
> > > previous ones)
> > >
> > > S1.
> > >
> > > > 1.a I really meant ProducerRecord, that's the class used to forward to
> > > > downstream processors in the PAPI. The only information missing in
> > > > this class is the topic name. I also considered relying on the Kafka
> > > > Producer ProducerRecord, but I assume it would not be consistent with
> > > > the KafkaStreams API.
> > >
> > > I'm confused -- are you saying that we're introducing a new kind of
> > > ProducerRecord class for this? Why not just use the existing one, ie the
> > > o.a.k.clients.producer.ProducerRecord class? This is what the
> > > ProductionExceptionHandler uses, so it's definitely "consistent". In other
> > > words, we can remove the "String deadLetterQueueTopicName"
> > >
> > > S2.
> > > I think this would be a good opportunity to also deprecate the existing
> > > #handle method of the DeserializationExceptionHandler, and replace it with
> > > one that uses a ProcessingContext instead of the ProcessorContext. Partly
> > > for the same reasons about guarding access to the #forward methods, partly
> > > because this method needs to be migrated to the new PAPI interface
> > > anyways, and ProcessingContext is part of the new one.
> > >
> > > S3.
> > > Regarding 2a. -- I'm inclined to agree that records which a Punctuator
> > > failed to produce should also be sent to the DLQ via the
> > > ProductionExceptionHandler. Users will just need to be careful about
> > > accessing certain fields of the ProcessingContext that aren't available in
> > > the punctuator, and need to check the Optional returned by the
> > > ProcessingContext#recordMetadata API.
> > > Also, from an implementation standpoint, it will be really hard to
> > > distinguish between a record created by a punctuator vs a processor from
> > > within the RecordCollector, which is the class that actually handles
> > > sending records to the Streams Producer and invoking the
> > > ProductionExceptionHandler. This is because the RecordCollector is at the
> > > "end" of the topology graph and doesn't have any context about which of 
> > > the
> > > upstream processors actually attempted to forward a record.
> > >
> > > This in itself is at least theoretically solvable, but it leads into my
> > > first major new point:
> > >
> > > S4:
> > > I'm deeply worried about passing the ProcessingContext in as a means of
> > > forwarding metadata. The problem is that the processing/processor context
> > > is a mutable class and is inherently meaningless outside the context of a
> > > specific task. And when I said earlier that the RecordCollector sits at
> > > the "end" of the topology, I meant that it's literally outside the task's
> > > subtopology and is used/shared by all tasks on that StreamThread. So to
> > > begin with, there's no guarantee what will actually be returned for
> > > essential methods such as the new #rawSourceKey/Value or the existing
> > > #recordMetadata
> > >
> > > For serialization exceptions it'll probably be correct, but for general
> > > send errors it almost definitely won't be. In short, this is because we
> > > send records to the producer after the sink node, but don't check for send
> > > errors right away since obviously it takes some time for the producer to
> > > actually send. In other words, sending/producing records is actually done
> > > asynchronously with processing, and we simply check for errors on any
> > > previously-sent records
> > > during the send on a new record in a sink node. This means the context we
> > > would be passing in to a (non-serialization) exception would pretty much
> > > always correspond not the the record that experienced the error, but the
> > > random record that happened to be being sent when we checked and saw the
> > > error for the failed record.
> > >
> > > This discrepancy, in addition to the whole "sourceRawKey/Value and
> > > recordMetadata are null for punctuators" issue, seems like an
> > > insurmountable inconsistency that is more likely to cause users confusion
> > > or problems than be helpful.
> > > We could create a new metadata object and copy over the relevant info from
> > > the ProcessingContext, but I worry that has the potential to explode 
> > > memory
> > > since we'd need to hold on to it for all in-flight records up until they
> > > are either successfully sent or failed and passed in to the
> > > ProductionExceptionHandler. But if the metadata is relatively small, it's
> > > probably fine. Especially if it's just the raw source key/value. Are
> > > there any other parts of the ProcessingContext you think should be made
> > > available?
> > >
> > > Note that this only applies to the ProductionExceptionHandler, as the
> > > DeserializationExceptionHandler (and the newly proposed
> > > ProcessingExceptionHandler) would both be invoked immediately and 
> > > therefore
> > > with the failed record's context. However, I'm also a bit uncomfortable
> > > with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So
> > > I'd propose to just wrap those (and any other metadata you might want) in 
> > > a
> > > container class and pass that in instead of the ProcessingContext, to all
> > > of the exception handlers.
> > >
> > > S5:
> > > For some reason I'm finding the proposed API a little bit awkward, 
> > > although
> > > it's entirely possible that the problem is with me, not the proposal :)
> > > Specifically I'm struggling with the approach of piggybacking on the
> > > exception handlers and their response enums to dictate how records are
> > > forwarded to the DLQ. I think this comes down to two things, though again,
> > > these aren't necessarily problems with the API and probably just need to 
> > > be
> > > hashed out:
> > >
> > > S5a.
> > > When I envision a DLQ, to me, the most common use case would be to forward
> > > input records that failed somewhere along the processing graph. But it
> > > seems like all the focus here is on the two far ends of the subtopology --
> > > the input/consumer, and the output/producer. I get that
> > > the ProcessingExceptionHandler is really the missing piece here, and it's
> > > hard to say anything specific since it's not yet accepted, but maybe a
> > > somewhat more concrete example would help. FWIW I think/hope to get that
> > > KIP accepted and implementation ASAP, so I'm not worried about the "what 
> > > if
> > > it doesn't happen" case -- more just want to know what it will look like
> > > when it does. Imo it's fine to build KIPs on top of future ones, it feels
> > > clear that this part will just have to wait for that KIP to actually be
> > > added.
> > >
> > > S5b:
> > > Why do users have to define the entire ProducerRecord -- shouldn't Streams
> > > handle all this for them? Or will we just automatically send every record
> > > on failure to the default global DLQ, and users only have to implement the
> > > handlers if they want to change the headers or send to a different topic? 
> > > I
> > > was having a bit of trouble understanding what the behavior would be if
> > > someone configured a "errors.deadletterqueue.topic.name" but didn't
> > > implement the handlers. Apologies if it's somewhere in the KIP and I
> > > happened to miss it!
> > >
> > > Either way, I really think an example would help me to better imagine what
> > > this will look like in practice, and evaluate whether it actually involves
> > > as much overhead as I'm worried it will. Can you add a section that
> > > includes a basic implementation of all the features here? Nothing too
> > > complicated, just the most bare-bones code needed to actually implement
> > > forwarding to a dead-letter-queue via the handlers.
> > >
> > > Lastly, two super small things:
> > >
> > > S6:
> > > We use camel case in Streams, so it should be rawSourceKey/Value rather
> > > than raw_source_key/value
> > >
> > > S7:
> > > Can you add javadocs for the #withDeadLetterQueueRecord? For example, it
> > > seems to me that if the topic to be sent to here is different than the
> > > default/global DLQ, then the user will need to make sure to have created
> > > this themselves up front.
> > >
> > > That's it from me...sorry for the long response, it's just because I'm
> > > excited for this feature and have been waiting on a KIP for this for 
> > > years.
> > >
> > > Cheers,
> > > Sophie
> > >
> > >
> > > On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
> > > <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> > > wrote:
> > >
> > > > Hi Andrew,
> > > >
> > > > Thanks a lot for your review, plenty of good points!
> > > >
> > > > 11. Typo fixed, good cach.
> > > >
> > > > 12. I do agree with you and Nick also mentioned it, I updated the KIP
> > > > to mention that context headers should be forwarded.
> > > >
> > > > 13. Good catch, to be consistent with KIP-298, and without a strong
> > > > opinion from my side, I updated the KIP with your prefix proposal.
> > > >
> > > > 14. I am not sure about this point, a big difference between KIP-298
> > > > and this one is that the handlers can easily be overridden, something
> > > > that is not doable in Kafka Connect.
> > > > If someone would like a different behavior, e.g. to mask the payload
> > > > or include further headers, I think we should encourage them to write
> > > > their own exception handlers to build the DLQ Record the way they
> > > > expect.
> > > >
> > > > 15. Yeah, that's a good point, I was not fully convinced about putting
> > > > a String in it, I do assume that "null" is also a valid value. I do
> > > > assume that the Stacktrace and the Exception in this case are the key
> > > > metadata for the user to troubleshoot the problem.
> > > > I updated the KIP to mention that the value should be null if
> > > > triggered in a punctuate.
> > > >
> > > > 16. I added a session to mention that Kafka Streams would not try to
> > > > automatically create the topic and the topic should either be
> > > > automatically created, or pre-created.
> > > >
> > > > 17. If a DLQ record can not be sent, the exception should go to the
> > > > uncaughtExceptionHandler. Let me clearly state it in the KIP.
> > > >
> > > > On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
> > > > <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> > > > wrote:
> > > > >
> > > > > Hi Nick,
> > > > >
> > > > > 1. Good point, that's less impactful than a custom interface, I just
> > > > > updated the KIP with the new signature.
> > > > >
> > > > > 1.a I really meant ProducerRecord, that's the class used to forward to
> > > > > downstream processors in the PAPI. The only information missing in
> > > > > this class is the topic name. I also considered relying on the Kafka
> > > > > Producer ProducerRecord, but I assume it would not be consistent with
> > > > > the KafkaStreams API.
> > > > >
> > > > > 2. Agreed
> > > > >
> > > > > 2.a I do think exceptions occurring during punctuate should be
> > > > > included in the DLQ.
> > > > > Even if building a suitable payload is almost impossible, even with
> > > > > custom code; those exceptions are still fatal for Kafka Streams by
> > > > > default and are something that can not be ignored safely.
> > > > > I do assume that most users would want to be informed if an error
> > > > > happened during a punctuate, even if only the metadata (e.g.
> > > > > stacktrace, exception) is provided.
> > > > > I am only concerned flooding the DLQ topic as, if a scheduled
> > > > > operation failed, very likely it will fails during the next
> > > > > invocation, but
> > > > >
> > > > > 4. Good point, I clarified the wording in the KIP to make it explicit.
> > > > >
> > > > > 5. Good point, I will clearly mention that it is out of scope as part
> > > > > of the KIP and might not be as trivial as people could expect. I will
> > > > > update the KIP once I do have some spare time.
> > > > >
> > > > > 6. Oh yeah, I didn't think about it, but forwarding input headers
> > > > > would definitely make sense. Confluent Schema Registry ID is actually
> > > > > part of the payload, but many correlation ID and technical metadata
> > > > > are passed through headers, it makes sense to forward them, specially
> > > > > as it is the default behavior of Kafka Streams,
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 12 Apr 2024 at 15:25, Nick Telford 
> > > > > <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> > > > wrote:
> > > > > >
> > > > > > Hi Damien and Sebastien,
> > > > > >
> > > > > > 1.
> > > > > > I think you can just add a `String topic` argument to the existing
> > > > > > `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> > > > > > deadLetterQueueRecord)` method, and then the implementation of the
> > > > > > exception handler could choose the topic to send records to using
> > > > whatever
> > > > > > logic the user desires. You could perhaps provide a built-in
> > > > implementation
> > > > > > that leverages your new config to send all records to an untyped DLQ
> > > > topic?
> > > > > >
> > > > > > 1a.
> > > > > > BTW you have a typo: in your DeserializationExceptionHandler, the 
> > > > > > type
> > > > of
> > > > > > your `deadLetterQueueRecord` argument is `ProducerRecord`, when it
> > > > should
> > > > > > probably be `ConsumerRecord`.
> > > > > >
> > > > > > 2.
> > > > > > Agreed. I think it's a good idea to provide an implementation that
> > > > sends to
> > > > > > a single DLQ by default, but it's important to enable users to
> > > > customize
> > > > > > this with their own exception handlers.
> > > > > >
> > > > > > 2a.
> > > > > > I'm not convinced that "errors" (e.g. failed punctuate) should be 
> > > > > > sent
> > > > to a
> > > > > > DLQ topic like it's a bad record. To me, a DLQ should only contain
> > > > records
> > > > > > that failed to process. I'm not even sure how a user would
> > > > > > re-process/action one of these other errors; it seems like the 
> > > > > > purview
> > > > of
> > > > > > error logging to me?
> > > > > >
> > > > > > 4.
> > > > > > My point here was that I think it would be useful for the KIP to
> > > > contain an
> > > > > > explanation of the behavior both with KIP-1033 and without it. i.e.
> > > > clarify
> > > > > > if/how records that throw an exception in a processor are handled. 
> > > > > > At
> > > > the
> > > > > > moment, I'm assuming that without KIP-1033, processing exceptions
> > > > would not
> > > > > > cause records to be sent to the DLQ, but with KIP-1033, they would. 
> > > > > > If
> > > > this
> > > > > > assumption is correct, I think it should be made explicit in the 
> > > > > > KIP.
> > > > > >
> > > > > > 5.
> > > > > > Understood. You may want to make this explicit in the documentation 
> > > > > > for
> > > > > > users, so they understand the consequences of re-processing data 
> > > > > > sent
> > > > to
> > > > > > their DLQ. The main reason I raised this point is it's something 
> > > > > > that's
> > > > > > tripped me up in numerous KIPs that that committers frequently 
> > > > > > remind
> > > > me
> > > > > > of; so I wanted to get ahead of it for once! :D
> > > > > >
> > > > > > And one new point:
> > > > > > 6.
> > > > > > The DLQ record schema appears to discard all custom headers set on 
> > > > > > the
> > > > > > source record. Is there a way these can be included? In particular, 
> > > > > > I'm
> > > > > > concerned with "schema pointer" headers (like those set by Schema
> > > > > > Registry), that may need to be propagated, especially if the records
> > > > are
> > > > > > fed back into the source topics for re-processing by the user.
> > > > > >
> > > > > > Regards,
> > > > > > Nick
> > > > > >
> > > > > >
> > > > > > On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> > > > > > <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Nick,
> > > > > > >
> > > > > > > Thanks a lot for your review and your useful comments!
> > > > > > >
> > > > > > > 1. It is a good point, as you mentioned, I think it would make 
> > > > > > > sense
> > > > > > > in some use cases to have potentially multiple DLQ topics, so we
> > > > > > > should provide an API to let users do it.
> > > > > > > Thinking out-loud here, maybe it is a better approach to create a 
> > > > > > > new
> > > > > > > Record class containing the topic name, e.g. DeadLetterQueueRecord
> > > > and
> > > > > > > changing the signature to
> > > > > > > withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> > > > > > > deadLetterQueueRecords) instead of
> > > > > > > withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> > > > > > > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord
> > > > would
> > > > > > > be something like "class DeadLetterQueueRecord extends
> > > > > > > org.apache.kafka.streams.processor.api;.ProducerRecords { String
> > > > > > > topic; /* + getter/setter + */ } "
> > > > > > >
> > > > > > > 2. I think the root question here is: should we have one DLQ 
> > > > > > > topic or
> > > > > > > multiple DLQ topics by default. This question highly depends on 
> > > > > > > the
> > > > > > > context, but implementing a default implementation to handle 
> > > > > > > multiple
> > > > > > > DLQ topics would be opinionated, e.g. how to manage errors in a
> > > > > > > punctuate?
> > > > > > > I think it makes sense to have the default implementation writing 
> > > > > > > all
> > > > > > > faulty records to a single DLQ, that's at least the approach I 
> > > > > > > used
> > > > in
> > > > > > > past applications: one DLQ per Kafka Streams application. Of 
> > > > > > > course
> > > > > > > the message format could change in the DLQ e.g. due to the source
> > > > > > > topic, but those DLQ records will be very likely troubleshooted, 
> > > > > > > and
> > > > > > > maybe replay, manually anyway.
> > > > > > > If a user needs to have multiple DLQ topics or want to enforce a
> > > > > > > specific schema, it's still possible, but they would need to
> > > > implement
> > > > > > > custom Exception Handlers.
> > > > > > > Coming back to 1. I do agree that it would make sense to have the
> > > > user
> > > > > > > set the DLQ topic name in the handlers for more flexibility.
> > > > > > >
> > > > > > > 3. Good point, sorry it was a typo, the ProcessingContext makes 
> > > > > > > much
> > > > > > > more sense here indeed.
> > > > > > >
> > > > > > > 4. I do assume that we could implement KIP-1033 (Processing 
> > > > > > > exception
> > > > > > > handler) independently from KIP-1034. I do hope that KIP-1033 
> > > > > > > would
> > > > be
> > > > > > > adopted and implemented before KIP-1034, but if that's not the 
> > > > > > > case,
> > > > > > > we could implement KIP-1034 indepantly and update KIP-1033 to 
> > > > > > > include
> > > > > > > the DLQ record afterward (in the same KIP or in a new one if not
> > > > > > > possible).
> > > > > > >
> > > > > > > 5. I think we should be clear that this KIP only covers the DLQ
> > > > record
> > > > > > > produced.
> > > > > > > Everything related to replay messages or recovery plan should be
> > > > > > > considered out-of-scope as it is use-case and error specific.
> > > > > > >
> > > > > > > Let me know if that's not clear, there are definitely points that
> > > > > > > highly debatable.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Damien
> > > > > > >
> > > > > > > On Fri, 12 Apr 2024 at 13:00, Nick Telford 
> > > > > > > <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> > > > wrote:
> > > > > > > >
> > > > > > > > Oh, and one more thing:
> > > > > > > >
> > > > > > > > 5.
> > > > > > > > Whenever you take a record out of the stream, and then 
> > > > > > > > potentially
> > > > > > > > re-introduce it at a later date, you introduce the potential for
> > > > record
> > > > > > > > ordering issues. For example, that record could have been 
> > > > > > > > destined
> > > > for a
> > > > > > > > Window that has been closed by the time it's re-processed. I'd
> > > > like to
> > > > > > > see
> > > > > > > > a section that considers these consequences, and perhaps make
> > > > those risks
> > > > > > > > clear to users. For the record, this is exactly what sunk 
> > > > > > > > KIP-990,
> > > > which
> > > > > > > > was an alternative approach to error handling that introduced 
> > > > > > > > the
> > > > same
> > > > > > > > issues.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Nick
> > > > > > > >
> > > > > > > > On Fri, 12 Apr 2024 at 11:54, Nick Telford 
> > > > > > > > <nick.telf...@gmail.com
> > <mailto:nick.telf...@gmail.com%0b>> > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Damien,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP! Dead-letter queues are something that I
> > > > think a
> > > > > > > lot of
> > > > > > > > > users would like.
> > > > > > > > >
> > > > > > > > > I think there are a few points with this KIP that concern me:
> > > > > > > > >
> > > > > > > > > 1.
> > > > > > > > > It looks like you can only define a single, global DLQ for the
> > > > entire
> > > > > > > > > Kafka Streams application? What about applications that would
> > > > like to
> > > > > > > > > define different DLQs for different data flows? This is
> > > > especially
> > > > > > > > > important when dealing with multiple source topics that have
> > > > different
> > > > > > > > > record schemas.
> > > > > > > > >
> > > > > > > > > 2.
> > > > > > > > > Your DLQ payload value can either be the record value that
> > > > failed, or
> > > > > > > an
> > > > > > > > > error string (such as "error during punctuate"). This is 
> > > > > > > > > likely
> > > > to
> > > > > > > cause
> > > > > > > > > problems when users try to process the records from the DLQ, 
> > > > > > > > > as
> > > > they
> > > > > > > can't
> > > > > > > > > guarantee the format of every record value will be the same.
> > > > This is
> > > > > > > very
> > > > > > > > > loosely related to point 1. above.
> > > > > > > > >
> > > > > > > > > 3.
> > > > > > > > > You provide a ProcessorContext to both exception handlers, but
> > > > state
> > > > > > > they
> > > > > > > > > cannot be used to forward records. In that case, I believe you
> > > > should
> > > > > > > use
> > > > > > > > > ProcessingContext instead, which statically guarantees that it
> > > > can't be
> > > > > > > > > used to forward records.
> > > > > > > > >
> > > > > > > > > 4.
> > > > > > > > > You mention the KIP-1033 ProcessingExceptionHandler, but 
> > > > > > > > > what's
> > > > the
> > > > > > > plan
> > > > > > > > > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > >
> > > > > > > > > Nick
> > > > > > > > >
> > > > > > > > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <
> > > > d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> In a general way, if the user does not configure the right 
> > > > > > > > >> ACL,
> > > > that
> > > > > > > > >> would be a security issue, but that's true for any topic.
> > > > > > > > >>
> > > > > > > > >> This KIP allows users to configure a Dead Letter Queue 
> > > > > > > > >> without
> > > > writing
> > > > > > > > >> custom Java code in Kafka Streams, not at the topic level.
> > > > > > > > >> A lot of applications are already implementing this pattern,
> > > > but the
> > > > > > > > >> required code to do it is quite painful and error prone, for
> > > > example
> > > > > > > > >> most apps I have seen created a new KafkaProducer to send
> > > > records to
> > > > > > > > >> their DLQ.
> > > > > > > > >>
> > > > > > > > >> As it would be disabled by default for backward 
> > > > > > > > >> compatibility,
> > > > I doubt
> > > > > > > > >> it would generate any security concern.
> > > > > > > > >> If a user explicitly configures a Deal Letter Queue, it would
> > > > be up to
> > > > > > > > >> him to configure the relevant ACLs to ensure that the right
> > > > principal
> > > > > > > > >> can access it.
> > > > > > > > >> It is already the case for all internal, input and output 
> > > > > > > > >> Kafka
> > > > > > > > >> Streams topics (e.g. repartition, changelog topics) that also
> > > > could
> > > > > > > > >> contain confidential data, so I do not think we should
> > > > implement a
> > > > > > > > >> different behavior for this one.
> > > > > > > > >>
> > > > > > > > >> In this KIP, we configured the default DLQ record to have the
> > > > initial
> > > > > > > > >> record key/value as we assume that it is the expected and 
> > > > > > > > >> wanted
> > > > > > > > >> behavior for most applications.
> > > > > > > > >> If a user does not want to have the key/value in the DLQ 
> > > > > > > > >> record
> > > > for
> > > > > > > > >> any reason, they could still implement exception handlers to
> > > > build
> > > > > > > > >> their own DLQ record.
> > > > > > > > >>
> > > > > > > > >> Regarding ACL, maybe something smarter could be done in Kafka
> > > > Streams,
> > > > > > > > >> but this is out of scope for this KIP.
> > > > > > > > >>
> > > > > > > > >> On Fri, 12 Apr 2024 at 11:58, Claude Warren 
> > > > > > > > >> <cla...@xenei.com<mailto:cla...@xenei.com>>
> > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > My concern is that someone would create a dead letter queue
> > > > on a
> > > > > > > > >> sensitive
> > > > > > > > >> > topic and not get the ACL correct from the start. Thus
> > > > causing
> > > > > > > > >> potential
> > > > > > > > >> > confidential data leak. Is there anything in the proposal
> > > > that
> > > > > > > would
> > > > > > > > >> > prevent that from happening? If so I did not recognize it 
> > > > > > > > >> > as
> > > > such.
> > > > > > > > >> >
> > > > > > > > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> > > > > > > d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi Claude,
> > > > > > > > >> > >
> > > > > > > > >> > > In this KIP, the Dead Letter Queue is materialized by a
> > > > standard
> > > > > > > and
> > > > > > > > >> > > independant topic, thus normal ACL applies to it like any
> > > > other
> > > > > > > topic.
> > > > > > > > >> > > This should not introduce any security issues, obviously,
> > > > the
> > > > > > > right
> > > > > > > > >> > > ACL would need to be provided to write to the DLQ if
> > > > configured.
> > > > > > > > >> > >
> > > > > > > > >> > > Cheers,
> > > > > > > > >> > > Damien
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> > > > > > > > >> > > <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>>
> > > > > > > > >> > >  wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > I am new to the Kafka codebase so please excuse any
> > > > ignorance
> > > > > > > on my
> > > > > > > > >> part.
> > > > > > > > >> > > >
> > > > > > > > >> > > > When a dead letter queue is established is there a
> > > > process to
> > > > > > > > >> ensure that
> > > > > > > > >> > > > it at least is defined with the same ACL as the 
> > > > > > > > >> > > > original
> > > > queue?
> > > > > > > > >> Without
> > > > > > > > >> > > > such a guarantee at the start it seems that managing 
> > > > > > > > >> > > > dead
> > > > letter
> > > > > > > > >> queues
> > > > > > > > >> > > > will be fraught with security issues.
> > > > > > > > >> > > >
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> > > > > > > > >> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> > > > > > > > >> > > >
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Hi everyone,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > To continue on our effort to improve Kafka Streams 
> > > > > > > > >> > > > > error
> > > > > > > > >> handling, we
> > > > > > > > >> > > > > propose a new KIP to add out of the box support for 
> > > > > > > > >> > > > > Dead
> > > > > > > Letter
> > > > > > > > >> Queue.
> > > > > > > > >> > > > > The goal of this KIP is to provide a default
> > > > implementation
> > > > > > > that
> > > > > > > > >> > > > > should be suitable for most applications and allow
> > > > users to
> > > > > > > > >> override
> > > > > > > > >> > > > > it if they have specific requirements.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > In order to build a suitable payload, some additional
> > > > changes
> > > > > > > are
> > > > > > > > >> > > > > included in this KIP:
> > > > > > > > >> > > > > 1. extend the ProcessingContext to hold, when
> > > > available, the
> > > > > > > > >> source
> > > > > > > > >> > > > > node raw key/value byte[]
> > > > > > > > >> > > > > 2. expose the ProcessingContext to the
> > > > > > > > >> ProductionExceptionHandler,
> > > > > > > > >> > > > > it is currently not available in the handle 
> > > > > > > > >> > > > > parameters.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Regarding point 2., to expose the ProcessingContext 
> > > > > > > > >> > > > > to
> > > > the
> > > > > > > > >> > > > > ProductionExceptionHandler, we considered two 
> > > > > > > > >> > > > > choices:
> > > > > > > > >> > > > > 1. exposing the ProcessingContext as a parameter in
> > > > the
> > > > > > > handle()
> > > > > > > > >> > > > > method. That's the cleanest way IMHO, but we would 
> > > > > > > > >> > > > > need
> > > > to
> > > > > > > > >> deprecate
> > > > > > > > >> > > > > the old method.
> > > > > > > > >> > > > > 2. exposing the ProcessingContext as an attribute in
> > > > the
> > > > > > > > >> interface.
> > > > > > > > >> > > > > This way, no method is deprecated, but we would not 
> > > > > > > > >> > > > > be
> > > > > > > consistent
> > > > > > > > >> with
> > > > > > > > >> > > > > the other ExceptionHandler.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > In the KIP, we chose the 1. solution (new handle
> > > > signature
> > > > > > > with
> > > > > > > > >> old
> > > > > > > > >> > > > > one deprecated), but we could use other opinions on
> > > > this part.
> > > > > > > > >> > > > > More information is available directly on the KIP.
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > KIP link:
> > > > > > > > >> > > > >
> > > > > > > > >> > >
> > > > > > > > >>
> > > > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Feedbacks and suggestions are welcome,
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > Cheers,
> > > > > > > > >> > > > > Damien, Sebastien and Loic
> > > > > > > > >> > > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> > LinkedIn: 
> > > > > > > > >> > http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>
> > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > >
>

Reply via email to