Hi,

since there was not too much activity in this thread recently, I was wondering what the status of this discussion is.

I cannot find the examples in the KIP Sébastien mentioned in the last message to this thread. I can also not find the corresponding definition of the following method call in the KIP:

FAIL.withDeadLetterQueueRecord(record, "dlq-topic")

I have also some comments:

B1
Did you consider to prefix the dead letter queue topic names with the application ID to distinguish the topics between Streams apps? Or is the user responsible for the differentiation? If the user is responsible, we risk that faulty records of different Streams apps end up in the same dead letter queue.

B2
Is the name of the dead letter queue topic config DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used.

B3
What is exactly the trigger to send a record to the dead letter queue? Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a record to the return value of the exception handler? What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do not add a record to the return value of the handler? What happens if I do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to the return value of the handler?

Best,
Bruno

On 4/22/24 10:19 PM, Sebastien Viale wrote:
Hi,

Thanks for your remarks

L1. I would say "who can do the most can do the least", even though most people 
will fail and stop, we found it interesting to offer the possibility to 
fail-and-send-to-DLQ

L2: We did not consider extending the TimestampExtractor because we estimate it 
out of scope for this KIP. Perhaps it will be possible to include it in an 
ExceptionHandler later.

L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ 
topic can be different in each custom Exception Handler:

When providing custom handlers, users would have the possibility to return:
  * FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

cheers !
Sébastien


________________________________
De : Lucas Brutschy <lbruts...@confluent.io.INVALID>
Envoyé : lundi 22 avril 2024 14:36
À : dev@kafka.apache.org <dev@kafka.apache.org>
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.



On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> wrote:

Hi everyone,

Following all the discussion on this KIP and KIP-1033, we introduced a
new container class containing only processing context metadata:
ProcessingMetadata. This new container class is actually part of
KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
think it's the wisest implementation wise.

I also clarified the interface of the enums:
withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[],
byte[]>> deadLetterQueueRecords) . Very likely most users would just
send one DLQ record, but there might be specific use-cases and what
can do more can do less, so I added an Iterable.

I took some time to think about the impact of storing the
ProcessingMetadata on the ProductionExceptionHandler. I think storing
the topic/offset/partition should be fine, but I am concerned about
storing the rawSourceKey/Value. I think it could impact some specific
use-cases, for example, a high-throughput Kafka Streams application
"counting" messages could have huge source input messages, and very
small sink messages, here, I assume storing the rawSourceKey/Value
could significantly require more memory than the actual Kafka Producer
buffer.

I think the safest approach is actually to only store the fixed-size
metadata for the ProductionExceptionHandler.handle:
topic/partition/offset/processorNodeId/taskId, it might be confusing
for the user, but 1) it is still better than nowaday where there are
no context information at all, 2) it would be clearly stated in the
javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
punctuate case). .

Do you think it would be a suitable design Sophie?

Cheers,
Damien

On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> wrote:

Hi Sophie,

Thanks for your feedback.
Completing the Damien's comments here for points S1 and S5B.

S1:
I'm confused -- are you saying that we're introducing a new kind of 
ProducerRecord class for this?

I am wondering if it makes sense to alter the ProducerRecord from Clients API with a 
"deadLetterQueueTopicName" attribute dedicated to Kafka Streams DLQ.
Adding "deadLetterQueueTopicName" as an additional parameter to 
"withDeadLetterQueueRecord" is a good option, and may allow users to send records to 
different DLQ topics depending on conditions:
@Override
public ProductionExceptionHandlerResponse handle(final ProcessingContext 
context,
ProducerRecord<byte[], byte[]> record,
Exception exception) {
if (condition1) {
return ProductionExceptionHandlerResponse.CONTINUE
.withDeadLetterQueueRecord(record, "dlq-topic-a");
}
if (condition2) {
return ProductionExceptionHandlerResponse.CONTINUE
.withDeadLetterQueueRecord(record, "dlq-topic-b");
}
return ProductionExceptionHandlerResponse.CONTINUE
.withDeadLetterQueueRecord(record, "dlq-topic-c");
}

S5B:
I was having a bit of trouble understanding what the behavior would be if someone 
configured a "errors.deadletterqueue.topic.name" but didn't implement the 
handlers.

The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler and 
DefaultProductionExceptionHandler should be able to tell if records should be 
sent to DLQ or not.
The "errors.deadletterqueue.topic.name" takes place to:

* Specifying if the provided handlers should or should not send records to DLQ.
* If the value is empty, the handlers should not send records to DLQ.
* If the value is not empty, the handlers should send records to DLQ.
* Define the name of the DLQ topic that should be used by the provided handlers.

Thus, if "errors.deadletterqueue.topic.name" is defined, the provided handlers 
should return either:

* CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
* FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
If "errors.deadletterqueue.topic.name" is defined but neither 
DeserializationExceptionHandler nor ProductionExceptionHandler classes are defined in the 
configuration, then nothing should happen as sending to DLQ is based on handlers’ 
response.
When providing custom handlers, users would have the possibility to return:

* FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

A DLQ topic name is currently required using the two last response types.
I am wondering if it could benefit users to ease the use of the default DLQ topic 
"errors.deadletterqueue.topic.name" when implementing custom handlers, with 
such kind of implementation:

* FAIL.withDefaultDeadLetterQueueRecord(record)
* CONTINUE.withDefaultDeadLetterQueueRecord(record)

Regards,
Loïc

De : Damien Gasparina <d.gaspar...@gmail.com>
Envoyé : dimanche 14 avril 2024 20:24
À : dev@kafka.apache.org
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Sophie,

Thanks a lot for your feedback and your detailed comments.

S1.
I'm confused -- are you saying that we're introducing a new kind of
ProducerRecord class for this?

Sorry for the poor wording, that's not what I meant. While writing the
KIP, I was hesitating between 1. leveraging the Kafka Producer
ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in
a separate parameter, 3. a new custom interface (e.g.
DeadLetterQueueRecord).
As the KafkaProducer ProducerRecord is not used in the Kafka Streams
API (except ProductionExceptionHandler) and I would like to avoid a
new interface if not strictly required, I leaned toward option 2.
Thinking about it, maybe option 1. would be best, but I assume it
could create confusion with KafkaStreams ProducerRecord. Let me sleep
on it.

S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and
your point in S4, it seems more and more likely that we will create a
new container class containing only the metadata for the exception
handlers. To be consistent, I think we should use this new
implementation in all exception handlers.
The only issue I could think off is that the new interface would
expose less data than the current ProcessorContext in the
DeserializationException(e.g. stateDir(), metrics(), getStateStore()),
thus it could be hard for some users to migrate to the new interface.
I do expect that only a few users would be impacted as the javadoc is
very clear: `Note, that the passed in {@link ProcessorContext} only
allows access to metadata like the task ID.`

S3. I completely agree with you, it is something that might not be
trivial and should be thoroughly covered by unit tests during the
implementation.

S4. Good point, I did not notice that the ProductionExceptionHandler
is also invoked in the producer.send() callback.
Capturing the ProcessingContext for each in-flight message is probably
not possible. I think there is no other way to write a custom
container class holding only the metadata that are essentials, I am
thinking of storing the following attributes: source topic, partition,
offset, rawKey, rawValue and taskId.
Those metadata should be relatively small, but I assume that there
could be a high number of in-flight messages, especially with at least
once processing guarantee. Do you think it would be fine memory wise?

S5. As many exceptions are only accessible in exception handlers, and
we wanted to 1) allow users to customize the DLQ records and 2) have a
suitable DLQ out of the box implementation, we felt it natural to rely
on exception handlers, that's also why we created KIP-1033.
Piggybacking on the enum response was the cleanest way we could think
off, but we are completely open to suggestions.

S5a. Completely agree with you on this point, for this DLQ approach to
be complete, the ProcessingExceptionHandler introduced in KIP-1033 is
required. KIP-1033 is definitely our first priority. We decided to
kick-off the KIP-1034 discussion as we expected the discussions to be
dynamic and could potentially impact some choices of KIP-1033.

S5b. In this KIP, we wanted to 1. provide as much flexibility to the
user as possible; 2. provide a good default implementation
for the DLQ without having to write custom exception handlers.
For the default implementation, we introduced a new configuration:
errors.deadletterqueue.topic.name.

If this configuration is set, it changes the behavior of the provided
exception handlers to return a DLQ record containing the raw key/value
+ headers + exception metadata in headers.
If the out of the box implementation is not suitable for a user, e.g.
the payload needs to be masked in the DLQ, it could implement their
own exception handlers. The errors.deadletterqueue.topic.name would
only impact Kafka Streams bundled exception handlers (e.g.
org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler)

Let me update the KIP to make it clear and also provide examples.

S6/S7. Good point, mea culpa for the camel case, it must have been a
sugar rush :)

Thanks again for your detailed comments and pointing out S4
(production exception & Processing Context)!

Cheers,
Damien
This email was screened for spam and malicious content but exercise caution 
anyway.




On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman 
<sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote:

Thanks for the KIP, this will make a lot of people very happy.

Wanted to chime in on a few points that have been raised so far and add
some of my own (numbering with an S to distinguish my points from the
previous ones)

S1.

1.a I really meant ProducerRecord, that's the class used to forward to
downstream processors in the PAPI. The only information missing in
this class is the topic name. I also considered relying on the Kafka
Producer ProducerRecord, but I assume it would not be consistent with
the KafkaStreams API.

I'm confused -- are you saying that we're introducing a new kind of
ProducerRecord class for this? Why not just use the existing one, ie the
o.a.k.clients.producer.ProducerRecord class? This is what the
ProductionExceptionHandler uses, so it's definitely "consistent". In other
words, we can remove the "String deadLetterQueueTopicName"

S2.
I think this would be a good opportunity to also deprecate the existing
#handle method of the DeserializationExceptionHandler, and replace it with
one that uses a ProcessingContext instead of the ProcessorContext. Partly
for the same reasons about guarding access to the #forward methods, partly
because this method needs to be migrated to the new PAPI interface
anyways, and ProcessingContext is part of the new one.

S3.
Regarding 2a. -- I'm inclined to agree that records which a Punctuator
failed to produce should also be sent to the DLQ via the
ProductionExceptionHandler. Users will just need to be careful about
accessing certain fields of the ProcessingContext that aren't available in
the punctuator, and need to check the Optional returned by the
ProcessingContext#recordMetadata API.
Also, from an implementation standpoint, it will be really hard to
distinguish between a record created by a punctuator vs a processor from
within the RecordCollector, which is the class that actually handles
sending records to the Streams Producer and invoking the
ProductionExceptionHandler. This is because the RecordCollector is at the
"end" of the topology graph and doesn't have any context about which of the
upstream processors actually attempted to forward a record.

This in itself is at least theoretically solvable, but it leads into my
first major new point:

S4:
I'm deeply worried about passing the ProcessingContext in as a means of
forwarding metadata. The problem is that the processing/processor context
is a mutable class and is inherently meaningless outside the context of a
specific task. And when I said earlier that the RecordCollector sits at
the "end" of the topology, I meant that it's literally outside the task's
subtopology and is used/shared by all tasks on that StreamThread. So to
begin with, there's no guarantee what will actually be returned for
essential methods such as the new #rawSourceKey/Value or the existing
#recordMetadata

For serialization exceptions it'll probably be correct, but for general
send errors it almost definitely won't be. In short, this is because we
send records to the producer after the sink node, but don't check for send
errors right away since obviously it takes some time for the producer to
actually send. In other words, sending/producing records is actually done
asynchronously with processing, and we simply check for errors on any
previously-sent records
during the send on a new record in a sink node. This means the context we
would be passing in to a (non-serialization) exception would pretty much
always correspond not the the record that experienced the error, but the
random record that happened to be being sent when we checked and saw the
error for the failed record.

This discrepancy, in addition to the whole "sourceRawKey/Value and
recordMetadata are null for punctuators" issue, seems like an
insurmountable inconsistency that is more likely to cause users confusion
or problems than be helpful.
We could create a new metadata object and copy over the relevant info from
the ProcessingContext, but I worry that has the potential to explode memory
since we'd need to hold on to it for all in-flight records up until they
are either successfully sent or failed and passed in to the
ProductionExceptionHandler. But if the metadata is relatively small, it's
probably fine. Especially if it's just the raw source key/value. Are
there any other parts of the ProcessingContext you think should be made
available?

Note that this only applies to the ProductionExceptionHandler, as the
DeserializationExceptionHandler (and the newly proposed
ProcessingExceptionHandler) would both be invoked immediately and therefore
with the failed record's context. However, I'm also a bit uncomfortable
with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So
I'd propose to just wrap those (and any other metadata you might want) in a
container class and pass that in instead of the ProcessingContext, to all
of the exception handlers.

S5:
For some reason I'm finding the proposed API a little bit awkward, although
it's entirely possible that the problem is with me, not the proposal :)
Specifically I'm struggling with the approach of piggybacking on the
exception handlers and their response enums to dictate how records are
forwarded to the DLQ. I think this comes down to two things, though again,
these aren't necessarily problems with the API and probably just need to be
hashed out:

S5a.
When I envision a DLQ, to me, the most common use case would be to forward
input records that failed somewhere along the processing graph. But it
seems like all the focus here is on the two far ends of the subtopology --
the input/consumer, and the output/producer. I get that
the ProcessingExceptionHandler is really the missing piece here, and it's
hard to say anything specific since it's not yet accepted, but maybe a
somewhat more concrete example would help. FWIW I think/hope to get that
KIP accepted and implementation ASAP, so I'm not worried about the "what if
it doesn't happen" case -- more just want to know what it will look like
when it does. Imo it's fine to build KIPs on top of future ones, it feels
clear that this part will just have to wait for that KIP to actually be
added.

S5b:
Why do users have to define the entire ProducerRecord -- shouldn't Streams
handle all this for them? Or will we just automatically send every record
on failure to the default global DLQ, and users only have to implement the
handlers if they want to change the headers or send to a different topic? I
was having a bit of trouble understanding what the behavior would be if
someone configured a "errors.deadletterqueue.topic.name" but didn't
implement the handlers. Apologies if it's somewhere in the KIP and I
happened to miss it!

Either way, I really think an example would help me to better imagine what
this will look like in practice, and evaluate whether it actually involves
as much overhead as I'm worried it will. Can you add a section that
includes a basic implementation of all the features here? Nothing too
complicated, just the most bare-bones code needed to actually implement
forwarding to a dead-letter-queue via the handlers.

Lastly, two super small things:

S6:
We use camel case in Streams, so it should be rawSourceKey/Value rather
than raw_source_key/value

S7:
Can you add javadocs for the #withDeadLetterQueueRecord? For example, it
seems to me that if the topic to be sent to here is different than the
default/global DLQ, then the user will need to make sure to have created
this themselves up front.

That's it from me...sorry for the long response, it's just because I'm
excited for this feature and have been waiting on a KIP for this for years.

Cheers,
Sophie


On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
<d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
wrote:

Hi Andrew,

Thanks a lot for your review, plenty of good points!

11. Typo fixed, good cach.

12. I do agree with you and Nick also mentioned it, I updated the KIP
to mention that context headers should be forwarded.

13. Good catch, to be consistent with KIP-298, and without a strong
opinion from my side, I updated the KIP with your prefix proposal.

14. I am not sure about this point, a big difference between KIP-298
and this one is that the handlers can easily be overridden, something
that is not doable in Kafka Connect.
If someone would like a different behavior, e.g. to mask the payload
or include further headers, I think we should encourage them to write
their own exception handlers to build the DLQ Record the way they
expect.

15. Yeah, that's a good point, I was not fully convinced about putting
a String in it, I do assume that "null" is also a valid value. I do
assume that the Stacktrace and the Exception in this case are the key
metadata for the user to troubleshoot the problem.
I updated the KIP to mention that the value should be null if
triggered in a punctuate.

16. I added a session to mention that Kafka Streams would not try to
automatically create the topic and the topic should either be
automatically created, or pre-created.

17. If a DLQ record can not be sent, the exception should go to the
uncaughtExceptionHandler. Let me clearly state it in the KIP.

On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
<d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
wrote:

Hi Nick,

1. Good point, that's less impactful than a custom interface, I just
updated the KIP with the new signature.

1.a I really meant ProducerRecord, that's the class used to forward to
downstream processors in the PAPI. The only information missing in
this class is the topic name. I also considered relying on the Kafka
Producer ProducerRecord, but I assume it would not be consistent with
the KafkaStreams API.

2. Agreed

2.a I do think exceptions occurring during punctuate should be
included in the DLQ.
Even if building a suitable payload is almost impossible, even with
custom code; those exceptions are still fatal for Kafka Streams by
default and are something that can not be ignored safely.
I do assume that most users would want to be informed if an error
happened during a punctuate, even if only the metadata (e.g.
stacktrace, exception) is provided.
I am only concerned flooding the DLQ topic as, if a scheduled
operation failed, very likely it will fails during the next
invocation, but

4. Good point, I clarified the wording in the KIP to make it explicit.

5. Good point, I will clearly mention that it is out of scope as part
of the KIP and might not be as trivial as people could expect. I will
update the KIP once I do have some spare time.

6. Oh yeah, I didn't think about it, but forwarding input headers
would definitely make sense. Confluent Schema Registry ID is actually
part of the payload, but many correlation ID and technical metadata
are passed through headers, it makes sense to forward them, specially
as it is the default behavior of Kafka Streams,



On Fri, 12 Apr 2024 at 15:25, Nick Telford 
<nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
wrote:

Hi Damien and Sebastien,

1.
I think you can just add a `String topic` argument to the existing
`withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
deadLetterQueueRecord)` method, and then the implementation of the
exception handler could choose the topic to send records to using
whatever
logic the user desires. You could perhaps provide a built-in
implementation
that leverages your new config to send all records to an untyped DLQ
topic?

1a.
BTW you have a typo: in your DeserializationExceptionHandler, the type
of
your `deadLetterQueueRecord` argument is `ProducerRecord`, when it
should
probably be `ConsumerRecord`.

2.
Agreed. I think it's a good idea to provide an implementation that
sends to
a single DLQ by default, but it's important to enable users to
customize
this with their own exception handlers.

2a.
I'm not convinced that "errors" (e.g. failed punctuate) should be sent
to a
DLQ topic like it's a bad record. To me, a DLQ should only contain
records
that failed to process. I'm not even sure how a user would
re-process/action one of these other errors; it seems like the purview
of
error logging to me?

4.
My point here was that I think it would be useful for the KIP to
contain an
explanation of the behavior both with KIP-1033 and without it. i.e.
clarify
if/how records that throw an exception in a processor are handled. At
the
moment, I'm assuming that without KIP-1033, processing exceptions
would not
cause records to be sent to the DLQ, but with KIP-1033, they would. If
this
assumption is correct, I think it should be made explicit in the KIP.

5.
Understood. You may want to make this explicit in the documentation for
users, so they understand the consequences of re-processing data sent
to
their DLQ. The main reason I raised this point is it's something that's
tripped me up in numerous KIPs that that committers frequently remind
me
of; so I wanted to get ahead of it for once! :D

And one new point:
6.
The DLQ record schema appears to discard all custom headers set on the
source record. Is there a way these can be included? In particular, I'm
concerned with "schema pointer" headers (like those set by Schema
Registry), that may need to be propagated, especially if the records
are
fed back into the source topics for re-processing by the user.

Regards,
Nick


On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
<d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
wrote:

Hi Nick,

Thanks a lot for your review and your useful comments!

1. It is a good point, as you mentioned, I think it would make sense
in some use cases to have potentially multiple DLQ topics, so we
should provide an API to let users do it.
Thinking out-loud here, maybe it is a better approach to create a new
Record class containing the topic name, e.g. DeadLetterQueueRecord
and
changing the signature to
withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
deadLetterQueueRecords) instead of
withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
deadLetterQueueRecord). What do you think? DeadLetterQueueRecord
would
be something like "class DeadLetterQueueRecord extends
org.apache.kafka.streams.processor.api;.ProducerRecords { String
topic; /* + getter/setter + */ } "

2. I think the root question here is: should we have one DLQ topic or
multiple DLQ topics by default. This question highly depends on the
context, but implementing a default implementation to handle multiple
DLQ topics would be opinionated, e.g. how to manage errors in a
punctuate?
I think it makes sense to have the default implementation writing all
faulty records to a single DLQ, that's at least the approach I used
in
past applications: one DLQ per Kafka Streams application. Of course
the message format could change in the DLQ e.g. due to the source
topic, but those DLQ records will be very likely troubleshooted, and
maybe replay, manually anyway.
If a user needs to have multiple DLQ topics or want to enforce a
specific schema, it's still possible, but they would need to
implement
custom Exception Handlers.
Coming back to 1. I do agree that it would make sense to have the
user
set the DLQ topic name in the handlers for more flexibility.

3. Good point, sorry it was a typo, the ProcessingContext makes much
more sense here indeed.

4. I do assume that we could implement KIP-1033 (Processing exception
handler) independently from KIP-1034. I do hope that KIP-1033 would
be
adopted and implemented before KIP-1034, but if that's not the case,
we could implement KIP-1034 indepantly and update KIP-1033 to include
the DLQ record afterward (in the same KIP or in a new one if not
possible).

5. I think we should be clear that this KIP only covers the DLQ
record
produced.
Everything related to replay messages or recovery plan should be
considered out-of-scope as it is use-case and error specific.

Let me know if that's not clear, there are definitely points that
highly debatable.

Cheers,
Damien

On Fri, 12 Apr 2024 at 13:00, Nick Telford 
<nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
wrote:

Oh, and one more thing:

5.
Whenever you take a record out of the stream, and then potentially
re-introduce it at a later date, you introduce the potential for
record
ordering issues. For example, that record could have been destined
for a
Window that has been closed by the time it's re-processed. I'd
like to
see
a section that considers these consequences, and perhaps make
those risks
clear to users. For the record, this is exactly what sunk KIP-990,
which
was an alternative approach to error handling that introduced the
same
issues.

Cheers,

Nick

On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com
<mailto:nick.telf...@gmail.com%0b>> > >
wrote:

Hi Damien,

Thanks for the KIP! Dead-letter queues are something that I
think a
lot of
users would like.

I think there are a few points with this KIP that concern me:

1.
It looks like you can only define a single, global DLQ for the
entire
Kafka Streams application? What about applications that would
like to
define different DLQs for different data flows? This is
especially
important when dealing with multiple source topics that have
different
record schemas.

2.
Your DLQ payload value can either be the record value that
failed, or
an
error string (such as "error during punctuate"). This is likely
to
cause
problems when users try to process the records from the DLQ, as
they
can't
guarantee the format of every record value will be the same.
This is
very
loosely related to point 1. above.

3.
You provide a ProcessorContext to both exception handlers, but
state
they
cannot be used to forward records. In that case, I believe you
should
use
ProcessingContext instead, which statically guarantees that it
can't be
used to forward records.

4.
You mention the KIP-1033 ProcessingExceptionHandler, but what's
the
plan
if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?

Regards,

Nick

On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <
d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
wrote:

In a general way, if the user does not configure the right ACL,
that
would be a security issue, but that's true for any topic.

This KIP allows users to configure a Dead Letter Queue without
writing
custom Java code in Kafka Streams, not at the topic level.
A lot of applications are already implementing this pattern,
but the
required code to do it is quite painful and error prone, for
example
most apps I have seen created a new KafkaProducer to send
records to
their DLQ.

As it would be disabled by default for backward compatibility,
I doubt
it would generate any security concern.
If a user explicitly configures a Deal Letter Queue, it would
be up to
him to configure the relevant ACLs to ensure that the right
principal
can access it.
It is already the case for all internal, input and output Kafka
Streams topics (e.g. repartition, changelog topics) that also
could
contain confidential data, so I do not think we should
implement a
different behavior for this one.

In this KIP, we configured the default DLQ record to have the
initial
record key/value as we assume that it is the expected and wanted
behavior for most applications.
If a user does not want to have the key/value in the DLQ record
for
any reason, they could still implement exception handlers to
build
their own DLQ record.

Regarding ACL, maybe something smarter could be done in Kafka
Streams,
but this is out of scope for this KIP.

On Fri, 12 Apr 2024 at 11:58, Claude Warren 
<cla...@xenei.com<mailto:cla...@xenei.com>>
wrote:

My concern is that someone would create a dead letter queue
on a
sensitive
topic and not get the ACL correct from the start. Thus
causing
potential
confidential data leak. Is there anything in the proposal
that
would
prevent that from happening? If so I did not recognize it as
such.

On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>

wrote:

Hi Claude,

In this KIP, the Dead Letter Queue is materialized by a
standard
and
independant topic, thus normal ACL applies to it like any
other
topic.
This should not introduce any security issues, obviously,
the
right
ACL would need to be provided to write to the DLQ if
configured.

Cheers,
Damien

On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
<claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>> wrote:

I am new to the Kafka codebase so please excuse any
ignorance
on my
part.

When a dead letter queue is established is there a
process to
ensure that
it at least is defined with the same ACL as the original
queue?
Without
such a guarantee at the start it seems that managing dead
letter
queues
will be fraught with security issues.


On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>

wrote:

Hi everyone,

To continue on our effort to improve Kafka Streams error
handling, we
propose a new KIP to add out of the box support for Dead
Letter
Queue.
The goal of this KIP is to provide a default
implementation
that
should be suitable for most applications and allow
users to
override
it if they have specific requirements.

In order to build a suitable payload, some additional
changes
are
included in this KIP:
1. extend the ProcessingContext to hold, when
available, the
source
node raw key/value byte[]
2. expose the ProcessingContext to the
ProductionExceptionHandler,
it is currently not available in the handle parameters.

Regarding point 2., to expose the ProcessingContext to
the
ProductionExceptionHandler, we considered two choices:
1. exposing the ProcessingContext as a parameter in
the
handle()
method. That's the cleanest way IMHO, but we would need
to
deprecate
the old method.
2. exposing the ProcessingContext as an attribute in
the
interface.
This way, no method is deprecated, but we would not be
consistent
with
the other ExceptionHandler.

In the KIP, we chose the 1. solution (new handle
signature
with
old
one deprecated), but we could use other opinions on
this part.
More information is available directly on the KIP.

KIP link:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>

Feedbacks and suggestions are welcome,

Cheers,
Damien, Sebastien and Loic




--
LinkedIn: 
http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>






Reply via email to