Hi Alieh,

Thanks for the KIP!

A few questions:

K1. What is the expected behavior for the producer if it generates a 
RecordTooLargeException, but the handler returns RETRY?
K2. How do we determine which Record was responsible for the 
UnknownTopicOrPartitionException since we get that response when sending  a 
batch of records?
K3. What is the expected behavior if the handle() method itself throws an error?
K4. What is the downside of adding an onError() method to the Producer’s 
Callback interface vs. a new mechanism?
K5. Can we change “ProducerExceptionHandlerResponse" to just “Response” given 
that it’s an inner enum?
K6. Any recommendation for callback authors to handle different behavior for 
different topics?

I’ll echo what others have said, it would help me understand why we want 
another handler class if there were more examples in the Motivation section. As 
it stands now, I agree with Chris that the stated issues could be solved by 
adding two new configuration options:

    oversized.record.behavior=fail
    retry.on.unknown.topic.or.partition=true

What I’m not yet able to wrap my head around is: what exactly would the logic 
in the handler be? I’m not very imaginative, so I’m assuming they’d mostly be 
if-this-then-that. However, if they’re more complicated, I’d have other 
concerns.

Thanks,
Kirk

> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi <asae...@confluent.io.INVALID> 
> wrote:
> 
> Thank you all for the feedback!
> 
> Addressing the main concern: The KIP is about giving the user the ability
> to handle producer exceptions, but to be more conservative and avoid future
> issues, we decided to be limited to a short list of exceptions. I included
> *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
> suggestion for adding some more ;-)
> 
> KIP Updates:
> - clarified the way that the user should configure the Producer to use the
> custom handler. I think adding a producer config property is the cleanest
> one.
> - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
> closer to what we are changing.
> - added the ProducerRecord as the input parameter of the handle() method as
> well.
> - increased the response types to 3 to have fail and two types of continue.
> - The default behaviour is having no custom handler, having the
> corresponding config parameter set to null. Therefore, the KIP provides no
> default implementation of the interface.
> - We follow the interface solution as described in the
> Rejected Alternetives section.
> 
> 
> Cheers,
> Alieh
> 
> 
> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Thanks for the KIP Alieh! It addresses an important case for error
>> handling.
>> 
>> I agree that using this handler would be an expert API, as mentioned by
>> a few people. But I don't think it would be a reason to not add it. It's
>> always a tricky tradeoff what to expose to users and to avoid foot guns,
>> but we added similar handlers to Kafka Streams, and have good experience
>> with it. Hence, I understand, but don't share the concern raised.
>> 
>> I also agree that there is some responsibility by the user to understand
>> how such a handler should be implemented to not drop data by accident.
>> But it seem unavoidable and acceptable.
>> 
>> While I understand that a "simpler / reduced" API (eg via configs) might
>> also work, I personally prefer a full handler. Configs have the same
>> issue that they could be miss-used potentially leading to incorrectly
>> dropped data, but at the same time are less flexible (and thus maybe
>> ever harder to use correctly...?). Base on my experience, there is also
>> often weird corner case for which it make sense to also drop records for
>> other exceptions, and a full handler has the advantage of full
>> flexibility and "absolute power!".
>> 
>> To be fair: I don't know the exact code paths of the producer in
>> details, so please keep me honest. But my understanding is, that the KIP
>> aims to allow users to react to internal exception, and decide to keep
>> retrying internally, swallow the error and drop the record, or raise the
>> error?
>> 
>> Maybe the KIP would need to be a little bit more precises what error we
>> want to cover -- I don't think this list must be exhaustive, as we can
>> always do follow up KIP to also apply the handler to other errors to
>> expand the scope of the handler. The KIP does mention examples, but it
>> might be good to explicitly state for what cases the handler gets applied?
>> 
>> I am also not sure if CONTINUE and FAIL are enough options? Don't we
>> need three options? Or would `CONTINUE` have different meaning depending
>> on the type of error? Ie, for a retryable error `CONTINUE` would mean
>> keep retrying internally, but for a non-retryable error `CONTINUE` means
>> swallow the error and drop the record? This semantic overload seems
>> tricky to reason about by users, so it might better to split `CONTINUE`
>> into two cases -> `RETRY` and `SWALLOW` (or some better names).
>> 
>> Additionally, should we just ship a `DefaultClientExceptionHandler`
>> which would return `FAIL`, for backward compatibility. Or don't have any
>> default handler to begin with and allow it to be `null`? I don't see the
>> need for a specific `TransactionExceptionHandler`. To me, the goal
>> should be to not modify the default behavior at all, but to just allow
>> users to change the default behavior if there is a need.
>> 
>> What is missing on the KIP though it, how the handler is passed into the
>> producer thought? Would we need a new config which allows to set a
>> custom handler? And/or would we allow to pass in an instance via the
>> constructor or add a new method to set a handler?
>> 
>> 
>> -Matthias
>> 
>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
>>> Hi Alieh,
>>> Thanks for the KIP.
>>> 
>>> Exception handling in the Kafka producer and consumer is really not
>> ideal.
>>> It’s even harder working out what’s going on with the consumer.
>>> 
>>> I’m a bit nervous about this KIP and I agree with Chris that it could do
>> with additional
>>> motivation. This would be an expert-level interface given how complicated
>>> the exception handling for Kafka has become.
>>> 
>>> 7. The application is not really aware of the batching being done on its
>> behalf.
>>> The ProduceResponse can actually return an array of records which failed
>>> per batch. If you get RecordTooLargeException, and want to retry, you
>> probably
>>> need to remove the offending records from the batch and retry it. This
>> is getting fiddly.
>>> 
>>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether an
>>> alternative might be to add a method to the existing Callback interface,
>> such as:
>>> 
>>>   ClientExceptionResponse onException(Exception exception)
>>> 
>>> It would be called when a ProduceResponse contains an error, but the
>>> producer is going to retry. It tells the producer whether to go ahead
>> with the retry
>>> or not. The default implementation would be to CONTINUE, because that’s
>>> just continuing to retry as planned. Note that this is a per-record
>> callback, so
>>> the application would be able to understand which records failed.
>>> 
>>> By using an existing interface, we already know how to configure it and
>> we know
>>> about the threading model for calling it.
>>> 
>>> 
>>> Thanks,
>>> Andrew
>>> 
>>> 
>>> 
>>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID>
>> wrote:
>>>> 
>>>> Hi Alieh,
>>>> 
>>>> Thanks for the KIP! The issue with writing to non-existent topics is
>>>> particularly frustrating for users of Kafka Connect and has been the
>> source
>>>> of a handful of Jira tickets over the years. My thoughts:
>>>> 
>>>> 1. An additional detail we can add to the motivation (or possibly
>> rejected
>>>> alternatives) section is that this kind of custom retry logic can't be
>>>> implemented by hand by, e.g., setting retries to 0 in the producer
>> config
>>>> and handling exceptions at the application level. Or rather, it can,
>> but 1)
>>>> it's a bit painful to have to reimplement at every call-site for
>>>> Producer::send (and any code that awaits the returned Future) and 2)
>> it's
>>>> impossible to do this without losing idempotency on retries.
>>>> 
>>>> 2. That said, I wonder if a pluggable interface is really the right call
>>>> here. Documenting the interactions of a producer with
>>>> a ClientExceptionHandler instance will be tricky, and implementing them
>>>> will also be a fair amount of work. I believe that there needs to be
>> some
>>>> more granularity for how writes to non-existent topics (or really,
>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are
>> handled,
>>>> but I'm torn between keeping it simple with maybe one or two new
>> producer
>>>> config properties, or a full-blown pluggable interface. If there are
>> more
>>>> cases that would benefit from a pluggable interface, it would be nice to
>>>> identify these and add them to the KIP to strengthen the motivation.
>> Right
>>>> now, I'm not sure the two that are mentioned in the motivation are
>>>> sufficient.
>>>> 
>>>> 3. Alternatively, a possible compromise is for this KIP to introduce new
>>>> properties that dictate how to handle unknown-topic-partition and
>>>> record-too-large errors, with the thinking that if we introduce a
>> pluggable
>>>> interface later on, these properties will be recognized by the default
>>>> implementation of that interface but could be completely ignored or
>>>> replaced by alternative implementations.
>>>> 
>>>> 4. (Nit) You can remove the "This page is meant as a template for
>> writing a
>>>> KIP..." part from the KIP. It's not a template anymore :)
>>>> 
>>>> 5. If we do go the pluggable interface route, wouldn't we want to add
>> the
>>>> possibility for retry logic? The simplest version of this could be to
>> add a
>>>> RETRY value to the ClientExceptionHandlerResponse enum.
>>>> 
>>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" for
>>>> the ClientExceptionHandlerResponse enum, since they cause records to be
>>>> dropped.
>>>> 
>>>> Cheers,
>>>> 
>>>> Chris
>>>> 
>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
>>>> <jols...@confluent.io.invalid> wrote:
>>>> 
>>>>> Hey Alieh,
>>>>> 
>>>>> I echo what Omnia says, I'm not sure I understand the implications of
>> the
>>>>> change and I think more detail is needed.
>>>>> 
>>>>> This comment also confused me a bit:
>>>>> * {@code ClientExceptionHandler} that continues the transaction even
>> if a
>>>>> record is too large.
>>>>> * Otherwise, it makes the transaction to fail.
>>>>> 
>>>>> Relatedly, I've been working with some folks on a KIP for transactions
>>>>> errors and how they are handled. Specifically for the
>>>>> RecordTooLargeException (and a few other errors), we want to give a new
>>>>> error category for this error that allows the application to choose
>> how it
>>>>> is handled. Maybe this KIP is something that you are looking for? Stay
>>>>> tuned :)
>>>>> 
>>>>> Justine
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com
>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi Alieh,
>>>>>> Thanks for the KIP! I have couple of comments
>>>>>> - You mentioned in the KIP motivation,
>>>>>>> Another example for which a production exception handler could be
>>>>> useful
>>>>>> is if a user tries to write into a non-existing topic, which returns a
>>>>>> retryable error code; with infinite retries, the producer would hang
>>>>>> retrying forever. A handler could help to break the infinite retry
>> loop.
>>>>>> 
>>>>>> How the handler can differentiate between something that is temporary
>> and
>>>>>> it should keep retrying and something permanent like forgot to create
>> the
>>>>>> topic? temporary here could be
>>>>>> the producer get deployed before the topic creation finish (specially
>> if
>>>>>> the topic creation is handled via IaC)
>>>>>> temporary offline partitions
>>>>>> leadership changing
>>>>>>        Isn’t this putting the producer at risk of dropping records
>>>>>> unintentionally?
>>>>>> - Can you elaborate more on what is written in the compatibility /
>>>>>> migration plan section please by explaining in bit more details what
>> is
>>>>> the
>>>>>> changing behaviour and how this will impact client who are upgrading?
>>>>>> - In the proposal changes can you elaborate in the KIP where in the
>>>>>> producer lifecycle will ClientExceptionHandler and
>>>>>> TransactionExceptionHandler get triggered, and how will the producer
>>>>>> configure them to point to costumed implementation.
>>>>>> 
>>>>>> Thanks
>>>>>> Omnia
>>>>>> 
>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi <asae...@confluent.io.INVALID
>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi all,
>>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer.
>>>>>>> <
>>>>>> 
>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
>>>>>>> 
>>>>>>> I look forward to your feedback!
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Alieh
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 

Reply via email to