Hi Bruno,

We focused our effort (well, mostly Seb and Loic :)) on KIP-1033,
that's why not much progress has been made on this one yet.
Regarding your points:

B1: It is up to the user to specify the DLQ topic name and to
implement a potential differentiation. I tend to think that having one
DLQ per application ID is the wisest, but I encountered cases and
applications that preferred having a shared DLQ topic between multiple
applications, e.g. to reduce the number of partitions, or to ease
monitoring

B2 : Goot catch, it should be
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, we removed the "DEFAULT"
prefix during the discussion, looks like I forgot to update all
occurrences in the KIP.

B3 :The trigger for sending to the DLQ would be if
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG is set OR if the user
implemented a custom exception handler that returns DLQ records.
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG would just override the
behavior of the default handler, thus custom exception handlers will
completely ignore this parameter.

I think it's a good trade-off between providing a production-ready
default implementation, yet providing sufficient flexibility for
complex use-cases.
This behavior definitely needs to be documented, but I guess it's safe
to push the responsibility of the DLQ records to the user if they
implement custom handlers.

Cheers,
Damien


On Tue, 11 Jun 2024 at 17:00, Bruno Cadonna <cado...@apache.org> wrote:
>
> Hi,
>
> since there was not too much activity in this thread recently, I was
> wondering what the status of this discussion is.
>
> I cannot find the examples in the KIP Sébastien mentioned in the last
> message to this thread. I can also not find the corresponding definition
> of the following method call in the KIP:
>
> FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
>
> I have also some comments:
>
> B1
> Did you consider to prefix the dead letter queue topic names with the
> application ID to distinguish the topics between Streams apps? Or is the
> user responsible for the differentiation? If the user is responsible, we
> risk that faulty records of different Streams apps end up in the same
> dead letter queue.
>
> B2
> Is the name of the dead letter queue topic config
> DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or
> ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used.
>
> B3
> What is exactly the trigger to send a record to the dead letter queue?
> Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a
> record to the return value of the exception handler?
> What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do
> not add a record to the return value of the handler? What happens if I
> do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to
> the return value of the handler?
>
> Best,
> Bruno
>
> On 4/22/24 10:19 PM, Sebastien Viale wrote:
> > Hi,
> >
> > Thanks for your remarks
> >
> > L1. I would say "who can do the most can do the least", even though most 
> > people will fail and stop, we found it interesting to offer the possibility 
> > to fail-and-send-to-DLQ
> >
> > L2: We did not consider extending the TimestampExtractor because we 
> > estimate it out of scope for this KIP. Perhaps it will be possible to 
> > include it in an ExceptionHandler later.
> >
> > L3: we will include an example in the KIP, but as we mentioned earlier, the 
> > DLQ topic can be different in each custom Exception Handler:
> >
> > When providing custom handlers, users would have the possibility to return:
> >   * FAIL
> > * CONTINUE
> > * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> > * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >
> > cheers !
> > Sébastien
> >
> >
> > ________________________________
> > De : Lucas Brutschy <lbruts...@confluent.io.INVALID>
> > Envoyé : lundi 22 avril 2024 14:36
> > À : dev@kafka.apache.org <dev@kafka.apache.org>
> > Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >
> > Warning External sender Do not click on any links or open any attachments 
> > unless you trust the sender and know the content is safe.
> >
> > Hi!
> >
> > Thanks for the KIP, great stuff.
> >
> > L1. I was a bit confused that the default configuration (once you set
> > a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
> > correctly. Is this something that will be a common use-case, and is it
> > a configuration that we want to encourage? It expected that you either
> > want to fail or skip-and-send-to-DLQ.
> >
> > L2. Have you considered extending the `TimestampExtractor` interface
> > so that it can also produce to DLQ? AFAIK it's not covered by any of
> > the existing exception handlers, but it can cause similar failures
> > (potentially custom logic, depends on validity input record). There
> > could also be a default implementation as a subclass of
> > `ExtractRecordMetadataTimestamp`.
> >
> > L3. It would be nice to include an example of how to produce to
> > multiple topics in the KIP, as I can imagine that this will be a
> > common use-case. I wasn't sure how much code would be involved to make
> > it work. If a lot of code is required, we may want to consider
> > exposing some utils that make it easier.
> >
> > Cheers,
> > Lucas
> >
> > This email was screened for spam and malicious content but exercise caution 
> > anyway.
> >
> >
> >
> > On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina <d.gaspar...@gmail.com> 
> > wrote:
> >>
> >> Hi everyone,
> >>
> >> Following all the discussion on this KIP and KIP-1033, we introduced a
> >> new container class containing only processing context metadata:
> >> ProcessingMetadata. This new container class is actually part of
> >> KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
> >> think it's the wisest implementation wise.
> >>
> >> I also clarified the interface of the enums:
> >> withDeadLetterQueueRecords(Iterable<org.apache.kafka.clients.producer.ProducerRecord<byte[],
> >> byte[]>> deadLetterQueueRecords) . Very likely most users would just
> >> send one DLQ record, but there might be specific use-cases and what
> >> can do more can do less, so I added an Iterable.
> >>
> >> I took some time to think about the impact of storing the
> >> ProcessingMetadata on the ProductionExceptionHandler. I think storing
> >> the topic/offset/partition should be fine, but I am concerned about
> >> storing the rawSourceKey/Value. I think it could impact some specific
> >> use-cases, for example, a high-throughput Kafka Streams application
> >> "counting" messages could have huge source input messages, and very
> >> small sink messages, here, I assume storing the rawSourceKey/Value
> >> could significantly require more memory than the actual Kafka Producer
> >> buffer.
> >>
> >> I think the safest approach is actually to only store the fixed-size
> >> metadata for the ProductionExceptionHandler.handle:
> >> topic/partition/offset/processorNodeId/taskId, it might be confusing
> >> for the user, but 1) it is still better than nowaday where there are
> >> no context information at all, 2) it would be clearly stated in the
> >> javadoc, 3) the rawSourceKey/Value are already nullable (e.g. the
> >> punctuate case). .
> >>
> >> Do you think it would be a suitable design Sophie?
> >>
> >> Cheers,
> >> Damien
> >>
> >> On Sun, 14 Apr 2024 at 21:30, Loic Greffier <loic.greff...@michelin.com> 
> >> wrote:
> >>>
> >>> Hi Sophie,
> >>>
> >>> Thanks for your feedback.
> >>> Completing the Damien's comments here for points S1 and S5B.
> >>>
> >>> S1:
> >>>> I'm confused -- are you saying that we're introducing a new kind of 
> >>>> ProducerRecord class for this?
> >>>
> >>> I am wondering if it makes sense to alter the ProducerRecord from Clients 
> >>> API with a "deadLetterQueueTopicName" attribute dedicated to Kafka 
> >>> Streams DLQ.
> >>> Adding "deadLetterQueueTopicName" as an additional parameter to 
> >>> "withDeadLetterQueueRecord" is a good option, and may allow users to send 
> >>> records to different DLQ topics depending on conditions:
> >>> @Override
> >>> public ProductionExceptionHandlerResponse handle(final ProcessingContext 
> >>> context,
> >>> ProducerRecord<byte[], byte[]> record,
> >>> Exception exception) {
> >>> if (condition1) {
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-a");
> >>> }
> >>> if (condition2) {
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-b");
> >>> }
> >>> return ProductionExceptionHandlerResponse.CONTINUE
> >>> .withDeadLetterQueueRecord(record, "dlq-topic-c");
> >>> }
> >>>
> >>> S5B:
> >>>> I was having a bit of trouble understanding what the behavior would be 
> >>>> if someone configured a "errors.deadletterqueue.topic.name" but didn't 
> >>>> implement the handlers.
> >>>
> >>> The provided LogAndContinueExceptionHandler, LogAndFailExceptionHandler 
> >>> and DefaultProductionExceptionHandler should be able to tell if records 
> >>> should be sent to DLQ or not.
> >>> The "errors.deadletterqueue.topic.name" takes place to:
> >>>
> >>> * Specifying if the provided handlers should or should not send records 
> >>> to DLQ.
> >>> * If the value is empty, the handlers should not send records to DLQ.
> >>> * If the value is not empty, the handlers should send records to DLQ.
> >>> * Define the name of the DLQ topic that should be used by the provided 
> >>> handlers.
> >>>
> >>> Thus, if "errors.deadletterqueue.topic.name" is defined, the provided 
> >>> handlers should return either:
> >>>
> >>> * CONTINUE.withDeadLetterQueueRecord(record, defaultDeadLetterQueue)
> >>> * FAIL.withDeadLetterQueueRecord(record, defaultDeadLetterQueue).
> >>> If "errors.deadletterqueue.topic.name" is defined but neither 
> >>> DeserializationExceptionHandler nor ProductionExceptionHandler classes 
> >>> are defined in the configuration, then nothing should happen as sending 
> >>> to DLQ is based on handlers’ response.
> >>> When providing custom handlers, users would have the possibility to 
> >>> return:
> >>>
> >>> * FAIL
> >>> * CONTINUE
> >>> * FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
> >>> * CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")
> >>>
> >>> A DLQ topic name is currently required using the two last response types.
> >>> I am wondering if it could benefit users to ease the use of the default 
> >>> DLQ topic "errors.deadletterqueue.topic.name" when implementing custom 
> >>> handlers, with such kind of implementation:
> >>>
> >>> * FAIL.withDefaultDeadLetterQueueRecord(record)
> >>> * CONTINUE.withDefaultDeadLetterQueueRecord(record)
> >>>
> >>> Regards,
> >>> Loïc
> >>>
> >>> De : Damien Gasparina <d.gaspar...@gmail.com>
> >>> Envoyé : dimanche 14 avril 2024 20:24
> >>> À : dev@kafka.apache.org
> >>> Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
> >>>
> >>> Warning External sender Do not click on any links or open any attachments 
> >>> unless you trust the sender and know the content is safe.
> >>>
> >>> Hi Sophie,
> >>>
> >>> Thanks a lot for your feedback and your detailed comments.
> >>>
> >>> S1.
> >>>> I'm confused -- are you saying that we're introducing a new kind of
> >>> ProducerRecord class for this?
> >>>
> >>> Sorry for the poor wording, that's not what I meant. While writing the
> >>> KIP, I was hesitating between 1. leveraging the Kafka Producer
> >>> ProducerRecord, 2. the Kafka Streams ProducerRecord + a topic name in
> >>> a separate parameter, 3. a new custom interface (e.g.
> >>> DeadLetterQueueRecord).
> >>> As the KafkaProducer ProducerRecord is not used in the Kafka Streams
> >>> API (except ProductionExceptionHandler) and I would like to avoid a
> >>> new interface if not strictly required, I leaned toward option 2.
> >>> Thinking about it, maybe option 1. would be best, but I assume it
> >>> could create confusion with KafkaStreams ProducerRecord. Let me sleep
> >>> on it.
> >>>
> >>> S2. I agree. Following the discussion in KIP-1033 and KIP-1034 and
> >>> your point in S4, it seems more and more likely that we will create a
> >>> new container class containing only the metadata for the exception
> >>> handlers. To be consistent, I think we should use this new
> >>> implementation in all exception handlers.
> >>> The only issue I could think off is that the new interface would
> >>> expose less data than the current ProcessorContext in the
> >>> DeserializationException(e.g. stateDir(), metrics(), getStateStore()),
> >>> thus it could be hard for some users to migrate to the new interface.
> >>> I do expect that only a few users would be impacted as the javadoc is
> >>> very clear: `Note, that the passed in {@link ProcessorContext} only
> >>> allows access to metadata like the task ID.`
> >>>
> >>> S3. I completely agree with you, it is something that might not be
> >>> trivial and should be thoroughly covered by unit tests during the
> >>> implementation.
> >>>
> >>> S4. Good point, I did not notice that the ProductionExceptionHandler
> >>> is also invoked in the producer.send() callback.
> >>> Capturing the ProcessingContext for each in-flight message is probably
> >>> not possible. I think there is no other way to write a custom
> >>> container class holding only the metadata that are essentials, I am
> >>> thinking of storing the following attributes: source topic, partition,
> >>> offset, rawKey, rawValue and taskId.
> >>> Those metadata should be relatively small, but I assume that there
> >>> could be a high number of in-flight messages, especially with at least
> >>> once processing guarantee. Do you think it would be fine memory wise?
> >>>
> >>> S5. As many exceptions are only accessible in exception handlers, and
> >>> we wanted to 1) allow users to customize the DLQ records and 2) have a
> >>> suitable DLQ out of the box implementation, we felt it natural to rely
> >>> on exception handlers, that's also why we created KIP-1033.
> >>> Piggybacking on the enum response was the cleanest way we could think
> >>> off, but we are completely open to suggestions.
> >>>
> >>> S5a. Completely agree with you on this point, for this DLQ approach to
> >>> be complete, the ProcessingExceptionHandler introduced in KIP-1033 is
> >>> required. KIP-1033 is definitely our first priority. We decided to
> >>> kick-off the KIP-1034 discussion as we expected the discussions to be
> >>> dynamic and could potentially impact some choices of KIP-1033.
> >>>
> >>> S5b. In this KIP, we wanted to 1. provide as much flexibility to the
> >>> user as possible; 2. provide a good default implementation
> >>> for the DLQ without having to write custom exception handlers.
> >>> For the default implementation, we introduced a new configuration:
> >>> errors.deadletterqueue.topic.name.
> >>>
> >>> If this configuration is set, it changes the behavior of the provided
> >>> exception handlers to return a DLQ record containing the raw key/value
> >>> + headers + exception metadata in headers.
> >>> If the out of the box implementation is not suitable for a user, e.g.
> >>> the payload needs to be masked in the DLQ, it could implement their
> >>> own exception handlers. The errors.deadletterqueue.topic.name would
> >>> only impact Kafka Streams bundled exception handlers (e.g.
> >>> org.apache.kafka.streams.errors;.LogAndContinueExceptionHandler)
> >>>
> >>> Let me update the KIP to make it clear and also provide examples.
> >>>
> >>> S6/S7. Good point, mea culpa for the camel case, it must have been a
> >>> sugar rush :)
> >>>
> >>> Thanks again for your detailed comments and pointing out S4
> >>> (production exception & Processing Context)!
> >>>
> >>> Cheers,
> >>> Damien
> >>> This email was screened for spam and malicious content but exercise 
> >>> caution anyway.
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, 12 Apr 2024 at 22:04, Sophie Blee-Goldman 
> >>> <sop...@responsive.dev<mailto:sop...@responsive.dev>> wrote:
> >>>>
> >>>> Thanks for the KIP, this will make a lot of people very happy.
> >>>>
> >>>> Wanted to chime in on a few points that have been raised so far and add
> >>>> some of my own (numbering with an S to distinguish my points from the
> >>>> previous ones)
> >>>>
> >>>> S1.
> >>>>
> >>>>> 1.a I really meant ProducerRecord, that's the class used to forward to
> >>>>> downstream processors in the PAPI. The only information missing in
> >>>>> this class is the topic name. I also considered relying on the Kafka
> >>>>> Producer ProducerRecord, but I assume it would not be consistent with
> >>>>> the KafkaStreams API.
> >>>>
> >>>> I'm confused -- are you saying that we're introducing a new kind of
> >>>> ProducerRecord class for this? Why not just use the existing one, ie the
> >>>> o.a.k.clients.producer.ProducerRecord class? This is what the
> >>>> ProductionExceptionHandler uses, so it's definitely "consistent". In 
> >>>> other
> >>>> words, we can remove the "String deadLetterQueueTopicName"
> >>>>
> >>>> S2.
> >>>> I think this would be a good opportunity to also deprecate the existing
> >>>> #handle method of the DeserializationExceptionHandler, and replace it 
> >>>> with
> >>>> one that uses a ProcessingContext instead of the ProcessorContext. Partly
> >>>> for the same reasons about guarding access to the #forward methods, 
> >>>> partly
> >>>> because this method needs to be migrated to the new PAPI interface
> >>>> anyways, and ProcessingContext is part of the new one.
> >>>>
> >>>> S3.
> >>>> Regarding 2a. -- I'm inclined to agree that records which a Punctuator
> >>>> failed to produce should also be sent to the DLQ via the
> >>>> ProductionExceptionHandler. Users will just need to be careful about
> >>>> accessing certain fields of the ProcessingContext that aren't available 
> >>>> in
> >>>> the punctuator, and need to check the Optional returned by the
> >>>> ProcessingContext#recordMetadata API.
> >>>> Also, from an implementation standpoint, it will be really hard to
> >>>> distinguish between a record created by a punctuator vs a processor from
> >>>> within the RecordCollector, which is the class that actually handles
> >>>> sending records to the Streams Producer and invoking the
> >>>> ProductionExceptionHandler. This is because the RecordCollector is at the
> >>>> "end" of the topology graph and doesn't have any context about which of 
> >>>> the
> >>>> upstream processors actually attempted to forward a record.
> >>>>
> >>>> This in itself is at least theoretically solvable, but it leads into my
> >>>> first major new point:
> >>>>
> >>>> S4:
> >>>> I'm deeply worried about passing the ProcessingContext in as a means of
> >>>> forwarding metadata. The problem is that the processing/processor context
> >>>> is a mutable class and is inherently meaningless outside the context of a
> >>>> specific task. And when I said earlier that the RecordCollector sits at
> >>>> the "end" of the topology, I meant that it's literally outside the task's
> >>>> subtopology and is used/shared by all tasks on that StreamThread. So to
> >>>> begin with, there's no guarantee what will actually be returned for
> >>>> essential methods such as the new #rawSourceKey/Value or the existing
> >>>> #recordMetadata
> >>>>
> >>>> For serialization exceptions it'll probably be correct, but for general
> >>>> send errors it almost definitely won't be. In short, this is because we
> >>>> send records to the producer after the sink node, but don't check for 
> >>>> send
> >>>> errors right away since obviously it takes some time for the producer to
> >>>> actually send. In other words, sending/producing records is actually done
> >>>> asynchronously with processing, and we simply check for errors on any
> >>>> previously-sent records
> >>>> during the send on a new record in a sink node. This means the context we
> >>>> would be passing in to a (non-serialization) exception would pretty much
> >>>> always correspond not the the record that experienced the error, but the
> >>>> random record that happened to be being sent when we checked and saw the
> >>>> error for the failed record.
> >>>>
> >>>> This discrepancy, in addition to the whole "sourceRawKey/Value and
> >>>> recordMetadata are null for punctuators" issue, seems like an
> >>>> insurmountable inconsistency that is more likely to cause users confusion
> >>>> or problems than be helpful.
> >>>> We could create a new metadata object and copy over the relevant info 
> >>>> from
> >>>> the ProcessingContext, but I worry that has the potential to explode 
> >>>> memory
> >>>> since we'd need to hold on to it for all in-flight records up until they
> >>>> are either successfully sent or failed and passed in to the
> >>>> ProductionExceptionHandler. But if the metadata is relatively small, it's
> >>>> probably fine. Especially if it's just the raw source key/value. Are
> >>>> there any other parts of the ProcessingContext you think should be made
> >>>> available?
> >>>>
> >>>> Note that this only applies to the ProductionExceptionHandler, as the
> >>>> DeserializationExceptionHandler (and the newly proposed
> >>>> ProcessingExceptionHandler) would both be invoked immediately and 
> >>>> therefore
> >>>> with the failed record's context. However, I'm also a bit uncomfortable
> >>>> with adding the rawSourceKey/rawSourceValue to the ProcessingContext. So
> >>>> I'd propose to just wrap those (and any other metadata you might want) 
> >>>> in a
> >>>> container class and pass that in instead of the ProcessingContext, to all
> >>>> of the exception handlers.
> >>>>
> >>>> S5:
> >>>> For some reason I'm finding the proposed API a little bit awkward, 
> >>>> although
> >>>> it's entirely possible that the problem is with me, not the proposal :)
> >>>> Specifically I'm struggling with the approach of piggybacking on the
> >>>> exception handlers and their response enums to dictate how records are
> >>>> forwarded to the DLQ. I think this comes down to two things, though 
> >>>> again,
> >>>> these aren't necessarily problems with the API and probably just need to 
> >>>> be
> >>>> hashed out:
> >>>>
> >>>> S5a.
> >>>> When I envision a DLQ, to me, the most common use case would be to 
> >>>> forward
> >>>> input records that failed somewhere along the processing graph. But it
> >>>> seems like all the focus here is on the two far ends of the subtopology 
> >>>> --
> >>>> the input/consumer, and the output/producer. I get that
> >>>> the ProcessingExceptionHandler is really the missing piece here, and it's
> >>>> hard to say anything specific since it's not yet accepted, but maybe a
> >>>> somewhat more concrete example would help. FWIW I think/hope to get that
> >>>> KIP accepted and implementation ASAP, so I'm not worried about the "what 
> >>>> if
> >>>> it doesn't happen" case -- more just want to know what it will look like
> >>>> when it does. Imo it's fine to build KIPs on top of future ones, it feels
> >>>> clear that this part will just have to wait for that KIP to actually be
> >>>> added.
> >>>>
> >>>> S5b:
> >>>> Why do users have to define the entire ProducerRecord -- shouldn't 
> >>>> Streams
> >>>> handle all this for them? Or will we just automatically send every record
> >>>> on failure to the default global DLQ, and users only have to implement 
> >>>> the
> >>>> handlers if they want to change the headers or send to a different 
> >>>> topic? I
> >>>> was having a bit of trouble understanding what the behavior would be if
> >>>> someone configured a "errors.deadletterqueue.topic.name" but didn't
> >>>> implement the handlers. Apologies if it's somewhere in the KIP and I
> >>>> happened to miss it!
> >>>>
> >>>> Either way, I really think an example would help me to better imagine 
> >>>> what
> >>>> this will look like in practice, and evaluate whether it actually 
> >>>> involves
> >>>> as much overhead as I'm worried it will. Can you add a section that
> >>>> includes a basic implementation of all the features here? Nothing too
> >>>> complicated, just the most bare-bones code needed to actually implement
> >>>> forwarding to a dead-letter-queue via the handlers.
> >>>>
> >>>> Lastly, two super small things:
> >>>>
> >>>> S6:
> >>>> We use camel case in Streams, so it should be rawSourceKey/Value rather
> >>>> than raw_source_key/value
> >>>>
> >>>> S7:
> >>>> Can you add javadocs for the #withDeadLetterQueueRecord? For example, it
> >>>> seems to me that if the topic to be sent to here is different than the
> >>>> default/global DLQ, then the user will need to make sure to have created
> >>>> this themselves up front.
> >>>>
> >>>> That's it from me...sorry for the long response, it's just because I'm
> >>>> excited for this feature and have been waiting on a KIP for this for 
> >>>> years.
> >>>>
> >>>> Cheers,
> >>>> Sophie
> >>>>
> >>>>
> >>>> On Fri, Apr 12, 2024 at 11:10 AM Damien Gasparina 
> >>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>> wrote:
> >>>>
> >>>>> Hi Andrew,
> >>>>>
> >>>>> Thanks a lot for your review, plenty of good points!
> >>>>>
> >>>>> 11. Typo fixed, good cach.
> >>>>>
> >>>>> 12. I do agree with you and Nick also mentioned it, I updated the KIP
> >>>>> to mention that context headers should be forwarded.
> >>>>>
> >>>>> 13. Good catch, to be consistent with KIP-298, and without a strong
> >>>>> opinion from my side, I updated the KIP with your prefix proposal.
> >>>>>
> >>>>> 14. I am not sure about this point, a big difference between KIP-298
> >>>>> and this one is that the handlers can easily be overridden, something
> >>>>> that is not doable in Kafka Connect.
> >>>>> If someone would like a different behavior, e.g. to mask the payload
> >>>>> or include further headers, I think we should encourage them to write
> >>>>> their own exception handlers to build the DLQ Record the way they
> >>>>> expect.
> >>>>>
> >>>>> 15. Yeah, that's a good point, I was not fully convinced about putting
> >>>>> a String in it, I do assume that "null" is also a valid value. I do
> >>>>> assume that the Stacktrace and the Exception in this case are the key
> >>>>> metadata for the user to troubleshoot the problem.
> >>>>> I updated the KIP to mention that the value should be null if
> >>>>> triggered in a punctuate.
> >>>>>
> >>>>> 16. I added a session to mention that Kafka Streams would not try to
> >>>>> automatically create the topic and the topic should either be
> >>>>> automatically created, or pre-created.
> >>>>>
> >>>>> 17. If a DLQ record can not be sent, the exception should go to the
> >>>>> uncaughtExceptionHandler. Let me clearly state it in the KIP.
> >>>>>
> >>>>> On Fri, 12 Apr 2024 at 17:25, Damien Gasparina 
> >>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi Nick,
> >>>>>>
> >>>>>> 1. Good point, that's less impactful than a custom interface, I just
> >>>>>> updated the KIP with the new signature.
> >>>>>>
> >>>>>> 1.a I really meant ProducerRecord, that's the class used to forward to
> >>>>>> downstream processors in the PAPI. The only information missing in
> >>>>>> this class is the topic name. I also considered relying on the Kafka
> >>>>>> Producer ProducerRecord, but I assume it would not be consistent with
> >>>>>> the KafkaStreams API.
> >>>>>>
> >>>>>> 2. Agreed
> >>>>>>
> >>>>>> 2.a I do think exceptions occurring during punctuate should be
> >>>>>> included in the DLQ.
> >>>>>> Even if building a suitable payload is almost impossible, even with
> >>>>>> custom code; those exceptions are still fatal for Kafka Streams by
> >>>>>> default and are something that can not be ignored safely.
> >>>>>> I do assume that most users would want to be informed if an error
> >>>>>> happened during a punctuate, even if only the metadata (e.g.
> >>>>>> stacktrace, exception) is provided.
> >>>>>> I am only concerned flooding the DLQ topic as, if a scheduled
> >>>>>> operation failed, very likely it will fails during the next
> >>>>>> invocation, but
> >>>>>>
> >>>>>> 4. Good point, I clarified the wording in the KIP to make it explicit.
> >>>>>>
> >>>>>> 5. Good point, I will clearly mention that it is out of scope as part
> >>>>>> of the KIP and might not be as trivial as people could expect. I will
> >>>>>> update the KIP once I do have some spare time.
> >>>>>>
> >>>>>> 6. Oh yeah, I didn't think about it, but forwarding input headers
> >>>>>> would definitely make sense. Confluent Schema Registry ID is actually
> >>>>>> part of the payload, but many correlation ID and technical metadata
> >>>>>> are passed through headers, it makes sense to forward them, specially
> >>>>>> as it is the default behavior of Kafka Streams,
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, 12 Apr 2024 at 15:25, Nick Telford 
> >>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Damien and Sebastien,
> >>>>>>>
> >>>>>>> 1.
> >>>>>>> I think you can just add a `String topic` argument to the existing
> >>>>>>> `withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>> deadLetterQueueRecord)` method, and then the implementation of the
> >>>>>>> exception handler could choose the topic to send records to using
> >>>>> whatever
> >>>>>>> logic the user desires. You could perhaps provide a built-in
> >>>>> implementation
> >>>>>>> that leverages your new config to send all records to an untyped DLQ
> >>>>> topic?
> >>>>>>>
> >>>>>>> 1a.
> >>>>>>> BTW you have a typo: in your DeserializationExceptionHandler, the type
> >>>>> of
> >>>>>>> your `deadLetterQueueRecord` argument is `ProducerRecord`, when it
> >>>>> should
> >>>>>>> probably be `ConsumerRecord`.
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> Agreed. I think it's a good idea to provide an implementation that
> >>>>> sends to
> >>>>>>> a single DLQ by default, but it's important to enable users to
> >>>>> customize
> >>>>>>> this with their own exception handlers.
> >>>>>>>
> >>>>>>> 2a.
> >>>>>>> I'm not convinced that "errors" (e.g. failed punctuate) should be sent
> >>>>> to a
> >>>>>>> DLQ topic like it's a bad record. To me, a DLQ should only contain
> >>>>> records
> >>>>>>> that failed to process. I'm not even sure how a user would
> >>>>>>> re-process/action one of these other errors; it seems like the purview
> >>>>> of
> >>>>>>> error logging to me?
> >>>>>>>
> >>>>>>> 4.
> >>>>>>> My point here was that I think it would be useful for the KIP to
> >>>>> contain an
> >>>>>>> explanation of the behavior both with KIP-1033 and without it. i.e.
> >>>>> clarify
> >>>>>>> if/how records that throw an exception in a processor are handled. At
> >>>>> the
> >>>>>>> moment, I'm assuming that without KIP-1033, processing exceptions
> >>>>> would not
> >>>>>>> cause records to be sent to the DLQ, but with KIP-1033, they would. If
> >>>>> this
> >>>>>>> assumption is correct, I think it should be made explicit in the KIP.
> >>>>>>>
> >>>>>>> 5.
> >>>>>>> Understood. You may want to make this explicit in the documentation 
> >>>>>>> for
> >>>>>>> users, so they understand the consequences of re-processing data sent
> >>>>> to
> >>>>>>> their DLQ. The main reason I raised this point is it's something 
> >>>>>>> that's
> >>>>>>> tripped me up in numerous KIPs that that committers frequently remind
> >>>>> me
> >>>>>>> of; so I wanted to get ahead of it for once! :D
> >>>>>>>
> >>>>>>> And one new point:
> >>>>>>> 6.
> >>>>>>> The DLQ record schema appears to discard all custom headers set on the
> >>>>>>> source record. Is there a way these can be included? In particular, 
> >>>>>>> I'm
> >>>>>>> concerned with "schema pointer" headers (like those set by Schema
> >>>>>>> Registry), that may need to be propagated, especially if the records
> >>>>> are
> >>>>>>> fed back into the source topics for re-processing by the user.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, 12 Apr 2024 at 13:20, Damien Gasparina 
> >>>>>>> <d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Nick,
> >>>>>>>>
> >>>>>>>> Thanks a lot for your review and your useful comments!
> >>>>>>>>
> >>>>>>>> 1. It is a good point, as you mentioned, I think it would make sense
> >>>>>>>> in some use cases to have potentially multiple DLQ topics, so we
> >>>>>>>> should provide an API to let users do it.
> >>>>>>>> Thinking out-loud here, maybe it is a better approach to create a new
> >>>>>>>> Record class containing the topic name, e.g. DeadLetterQueueRecord
> >>>>> and
> >>>>>>>> changing the signature to
> >>>>>>>> withDeadLetterQueueRecords(Iteratable<DeadLetterQueueRecord>
> >>>>>>>> deadLetterQueueRecords) instead of
> >>>>>>>> withDeadLetterQueueRecord(ProducerRecord<byte[], byte[]>
> >>>>>>>> deadLetterQueueRecord). What do you think? DeadLetterQueueRecord
> >>>>> would
> >>>>>>>> be something like "class DeadLetterQueueRecord extends
> >>>>>>>> org.apache.kafka.streams.processor.api;.ProducerRecords { String
> >>>>>>>> topic; /* + getter/setter + */ } "
> >>>>>>>>
> >>>>>>>> 2. I think the root question here is: should we have one DLQ topic or
> >>>>>>>> multiple DLQ topics by default. This question highly depends on the
> >>>>>>>> context, but implementing a default implementation to handle multiple
> >>>>>>>> DLQ topics would be opinionated, e.g. how to manage errors in a
> >>>>>>>> punctuate?
> >>>>>>>> I think it makes sense to have the default implementation writing all
> >>>>>>>> faulty records to a single DLQ, that's at least the approach I used
> >>>>> in
> >>>>>>>> past applications: one DLQ per Kafka Streams application. Of course
> >>>>>>>> the message format could change in the DLQ e.g. due to the source
> >>>>>>>> topic, but those DLQ records will be very likely troubleshooted, and
> >>>>>>>> maybe replay, manually anyway.
> >>>>>>>> If a user needs to have multiple DLQ topics or want to enforce a
> >>>>>>>> specific schema, it's still possible, but they would need to
> >>>>> implement
> >>>>>>>> custom Exception Handlers.
> >>>>>>>> Coming back to 1. I do agree that it would make sense to have the
> >>>>> user
> >>>>>>>> set the DLQ topic name in the handlers for more flexibility.
> >>>>>>>>
> >>>>>>>> 3. Good point, sorry it was a typo, the ProcessingContext makes much
> >>>>>>>> more sense here indeed.
> >>>>>>>>
> >>>>>>>> 4. I do assume that we could implement KIP-1033 (Processing exception
> >>>>>>>> handler) independently from KIP-1034. I do hope that KIP-1033 would
> >>>>> be
> >>>>>>>> adopted and implemented before KIP-1034, but if that's not the case,
> >>>>>>>> we could implement KIP-1034 indepantly and update KIP-1033 to include
> >>>>>>>> the DLQ record afterward (in the same KIP or in a new one if not
> >>>>>>>> possible).
> >>>>>>>>
> >>>>>>>> 5. I think we should be clear that this KIP only covers the DLQ
> >>>>> record
> >>>>>>>> produced.
> >>>>>>>> Everything related to replay messages or recovery plan should be
> >>>>>>>> considered out-of-scope as it is use-case and error specific.
> >>>>>>>>
> >>>>>>>> Let me know if that's not clear, there are definitely points that
> >>>>>>>> highly debatable.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Damien
> >>>>>>>>
> >>>>>>>> On Fri, 12 Apr 2024 at 13:00, Nick Telford 
> >>>>>>>> <nick.telf...@gmail.com<mailto:nick.telf...@gmail.com>>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Oh, and one more thing:
> >>>>>>>>>
> >>>>>>>>> 5.
> >>>>>>>>> Whenever you take a record out of the stream, and then potentially
> >>>>>>>>> re-introduce it at a later date, you introduce the potential for
> >>>>> record
> >>>>>>>>> ordering issues. For example, that record could have been destined
> >>>>> for a
> >>>>>>>>> Window that has been closed by the time it's re-processed. I'd
> >>>>> like to
> >>>>>>>> see
> >>>>>>>>> a section that considers these consequences, and perhaps make
> >>>>> those risks
> >>>>>>>>> clear to users. For the record, this is exactly what sunk KIP-990,
> >>>>> which
> >>>>>>>>> was an alternative approach to error handling that introduced the
> >>>>> same
> >>>>>>>>> issues.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>> On Fri, 12 Apr 2024 at 11:54, Nick Telford <nick.telf...@gmail.com
> >>> <mailto:nick.telf...@gmail.com%0b>> > >
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Damien,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the KIP! Dead-letter queues are something that I
> >>>>> think a
> >>>>>>>> lot of
> >>>>>>>>>> users would like.
> >>>>>>>>>>
> >>>>>>>>>> I think there are a few points with this KIP that concern me:
> >>>>>>>>>>
> >>>>>>>>>> 1.
> >>>>>>>>>> It looks like you can only define a single, global DLQ for the
> >>>>> entire
> >>>>>>>>>> Kafka Streams application? What about applications that would
> >>>>> like to
> >>>>>>>>>> define different DLQs for different data flows? This is
> >>>>> especially
> >>>>>>>>>> important when dealing with multiple source topics that have
> >>>>> different
> >>>>>>>>>> record schemas.
> >>>>>>>>>>
> >>>>>>>>>> 2.
> >>>>>>>>>> Your DLQ payload value can either be the record value that
> >>>>> failed, or
> >>>>>>>> an
> >>>>>>>>>> error string (such as "error during punctuate"). This is likely
> >>>>> to
> >>>>>>>> cause
> >>>>>>>>>> problems when users try to process the records from the DLQ, as
> >>>>> they
> >>>>>>>> can't
> >>>>>>>>>> guarantee the format of every record value will be the same.
> >>>>> This is
> >>>>>>>> very
> >>>>>>>>>> loosely related to point 1. above.
> >>>>>>>>>>
> >>>>>>>>>> 3.
> >>>>>>>>>> You provide a ProcessorContext to both exception handlers, but
> >>>>> state
> >>>>>>>> they
> >>>>>>>>>> cannot be used to forward records. In that case, I believe you
> >>>>> should
> >>>>>>>> use
> >>>>>>>>>> ProcessingContext instead, which statically guarantees that it
> >>>>> can't be
> >>>>>>>>>> used to forward records.
> >>>>>>>>>>
> >>>>>>>>>> 4.
> >>>>>>>>>> You mention the KIP-1033 ProcessingExceptionHandler, but what's
> >>>>> the
> >>>>>>>> plan
> >>>>>>>>>> if KIP-1033 is not adopted, or if KIP-1034 lands before 1033?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>>
> >>>>>>>>>> Nick
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 12 Apr 2024 at 11:38, Damien Gasparina <
> >>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> In a general way, if the user does not configure the right ACL,
> >>>>> that
> >>>>>>>>>>> would be a security issue, but that's true for any topic.
> >>>>>>>>>>>
> >>>>>>>>>>> This KIP allows users to configure a Dead Letter Queue without
> >>>>> writing
> >>>>>>>>>>> custom Java code in Kafka Streams, not at the topic level.
> >>>>>>>>>>> A lot of applications are already implementing this pattern,
> >>>>> but the
> >>>>>>>>>>> required code to do it is quite painful and error prone, for
> >>>>> example
> >>>>>>>>>>> most apps I have seen created a new KafkaProducer to send
> >>>>> records to
> >>>>>>>>>>> their DLQ.
> >>>>>>>>>>>
> >>>>>>>>>>> As it would be disabled by default for backward compatibility,
> >>>>> I doubt
> >>>>>>>>>>> it would generate any security concern.
> >>>>>>>>>>> If a user explicitly configures a Deal Letter Queue, it would
> >>>>> be up to
> >>>>>>>>>>> him to configure the relevant ACLs to ensure that the right
> >>>>> principal
> >>>>>>>>>>> can access it.
> >>>>>>>>>>> It is already the case for all internal, input and output Kafka
> >>>>>>>>>>> Streams topics (e.g. repartition, changelog topics) that also
> >>>>> could
> >>>>>>>>>>> contain confidential data, so I do not think we should
> >>>>> implement a
> >>>>>>>>>>> different behavior for this one.
> >>>>>>>>>>>
> >>>>>>>>>>> In this KIP, we configured the default DLQ record to have the
> >>>>> initial
> >>>>>>>>>>> record key/value as we assume that it is the expected and wanted
> >>>>>>>>>>> behavior for most applications.
> >>>>>>>>>>> If a user does not want to have the key/value in the DLQ record
> >>>>> for
> >>>>>>>>>>> any reason, they could still implement exception handlers to
> >>>>> build
> >>>>>>>>>>> their own DLQ record.
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding ACL, maybe something smarter could be done in Kafka
> >>>>> Streams,
> >>>>>>>>>>> but this is out of scope for this KIP.
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, 12 Apr 2024 at 11:58, Claude Warren 
> >>>>>>>>>>> <cla...@xenei.com<mailto:cla...@xenei.com>>
> >>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> My concern is that someone would create a dead letter queue
> >>>>> on a
> >>>>>>>>>>> sensitive
> >>>>>>>>>>>> topic and not get the ACL correct from the start. Thus
> >>>>> causing
> >>>>>>>>>>> potential
> >>>>>>>>>>>> confidential data leak. Is there anything in the proposal
> >>>>> that
> >>>>>>>> would
> >>>>>>>>>>>> prevent that from happening? If so I did not recognize it as
> >>>>> such.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina <
> >>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Claude,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In this KIP, the Dead Letter Queue is materialized by a
> >>>>> standard
> >>>>>>>> and
> >>>>>>>>>>>>> independant topic, thus normal ACL applies to it like any
> >>>>> other
> >>>>>>>> topic.
> >>>>>>>>>>>>> This should not introduce any security issues, obviously,
> >>>>> the
> >>>>>>>> right
> >>>>>>>>>>>>> ACL would need to be provided to write to the DLQ if
> >>>>> configured.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Damien
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr
> >>>>>>>>>>>>> <claude.war...@aiven.io.invalid<mailto:claude.war...@aiven.io.invalid>>
> >>>>>>>>>>>>>  wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am new to the Kafka codebase so please excuse any
> >>>>> ignorance
> >>>>>>>> on my
> >>>>>>>>>>> part.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> When a dead letter queue is established is there a
> >>>>> process to
> >>>>>>>>>>> ensure that
> >>>>>>>>>>>>>> it at least is defined with the same ACL as the original
> >>>>> queue?
> >>>>>>>>>>> Without
> >>>>>>>>>>>>>> such a guarantee at the start it seems that managing dead
> >>>>> letter
> >>>>>>>>>>> queues
> >>>>>>>>>>>>>> will be fraught with security issues.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina <
> >>>>>>>>>>> d.gaspar...@gmail.com<mailto:d.gaspar...@gmail.com>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> To continue on our effort to improve Kafka Streams error
> >>>>>>>>>>> handling, we
> >>>>>>>>>>>>>>> propose a new KIP to add out of the box support for Dead
> >>>>>>>> Letter
> >>>>>>>>>>> Queue.
> >>>>>>>>>>>>>>> The goal of this KIP is to provide a default
> >>>>> implementation
> >>>>>>>> that
> >>>>>>>>>>>>>>> should be suitable for most applications and allow
> >>>>> users to
> >>>>>>>>>>> override
> >>>>>>>>>>>>>>> it if they have specific requirements.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In order to build a suitable payload, some additional
> >>>>> changes
> >>>>>>>> are
> >>>>>>>>>>>>>>> included in this KIP:
> >>>>>>>>>>>>>>> 1. extend the ProcessingContext to hold, when
> >>>>> available, the
> >>>>>>>>>>> source
> >>>>>>>>>>>>>>> node raw key/value byte[]
> >>>>>>>>>>>>>>> 2. expose the ProcessingContext to the
> >>>>>>>>>>> ProductionExceptionHandler,
> >>>>>>>>>>>>>>> it is currently not available in the handle parameters.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regarding point 2., to expose the ProcessingContext to
> >>>>> the
> >>>>>>>>>>>>>>> ProductionExceptionHandler, we considered two choices:
> >>>>>>>>>>>>>>> 1. exposing the ProcessingContext as a parameter in
> >>>>> the
> >>>>>>>> handle()
> >>>>>>>>>>>>>>> method. That's the cleanest way IMHO, but we would need
> >>>>> to
> >>>>>>>>>>> deprecate
> >>>>>>>>>>>>>>> the old method.
> >>>>>>>>>>>>>>> 2. exposing the ProcessingContext as an attribute in
> >>>>> the
> >>>>>>>>>>> interface.
> >>>>>>>>>>>>>>> This way, no method is deprecated, but we would not be
> >>>>>>>> consistent
> >>>>>>>>>>> with
> >>>>>>>>>>>>>>> the other ExceptionHandler.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In the KIP, we chose the 1. solution (new handle
> >>>>> signature
> >>>>>>>> with
> >>>>>>>>>>> old
> >>>>>>>>>>>>>>> one deprecated), but we could use other opinions on
> >>>>> this part.
> >>>>>>>>>>>>>>> More information is available directly on the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> KIP link:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Feedbacks and suggestions are welcome,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Damien, Sebastien and Loic
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> LinkedIn: 
> >>>>>>>>>>>> http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren><http://www.linkedin.com/in/claudewarren<http://www.linkedin.com/in/claudewarren>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >>

Reply via email to