Hi Alieh,

Thanks for the updates!

Comments inline...


> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi <asae...@confluent.io.INVALID> 
> wrote:
> 
> Hi all,
> 
> Thanks a lot for the constructive feedbacks!
> 
> 
> 
> Addressing some of the main concerns:
> 
> 
> - The `RecordTooLargeException` can be thrown by broker, producer and
> consumer. Of course, the `ProducerExceptionHandler` interface is introduced
> to affect only the exceptions thrown from the producer. This KIP very
> specifically means to provide a possibility to manage the
> `RecordTooLargeException` thrown from the Producer.send() method. Please
> see “Proposed Changes” section for more clarity. I investigated the issue
> there thoroughly. I hope it can explain the concern about how we handle the
> errors as well.
> 
> 
> 
> - The problem with Callback: Methods of Callback are called when the record
> sent to the server is acknowledged, while this is not the desired time for
> all exceptions. We intend to handle exceptions beforehand.

I guess it makes sense to keep the expectation for when Callback is invoked 
as-is vs. shoehorning more into it.

> - What if the custom handler returns RETRY for `RecordTooLargeException`? I
> assume changing the producer configuration at runtime is possible. If so,
> RETRY for a too large record is valid because maybe in the next try, the
> too large record is not poisoning any more. I am not 100% sure about the
> technical details, though. Otherwise, we can consider the RETRY as FAIL for
> this exception. Another solution would be to consider a constant number of
> times for RETRY which can be useful for other exceptions as well.

It’s not presently possible to change the configuration of an existing Producer 
at runtime. So if a record hits a RecordTooLargeException once, no amount of 
retrying (with the current Producer) will change that fact. So I’m still a 
little stuck on how to handle a response of RETRY for an “oversized” record. 

> - What if the handle() method itself throws an exception? I think
> rationally and pragmatically, the behaviour must be exactly like when no
> custom handler is defined since the user actually did not have a working
> handler.

I’m not convinced that ignoring an errant handler is the right choice. It then 
becomes a silent failure that might have repercussions, depending on the 
business logic. A user would have to proactively trawls through the logs for 
WARN/ERROR messages to catch it.

Throwing a hard error is pretty draconian, though…

> - Why not use config parameters instead of an interface? As explained in
> the “Rejected Alternatives” section, we assume that the handler will be
> used for a greater number of exceptions in the future. Defining a
> configuration parameter for each exception may make the configuration a bit
> messy. Moreover, the handler offers more flexibility.

Agreed that the logic-via-configuration approach is weird and limiting. Forget 
I ever suggested it ;)

I’d think additional background in the Motivation section would help me 
understand how users might use this feature beyond a) skipping “oversized” 
records, and b) not retrying missing topics. 

> Small change:
> 
> -ProductionExceptionHandlerResponse -> Response for brevity and simplicity.
> Could’ve been HandlerResponse too I think!

The name change sounds good to me.

Thanks Alieh!

> 
> 
> I thank you all again for your useful questions/suggestions.
> 
> I would be happy to hear more of your concerns, as stated in some feedback.
> 
> Cheers,
> Alieh
> 
> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> <jols...@confluent.io.invalid> wrote:
> 
>> Thanks Alieh for the updates.
>> 
>> I'm a little concerned about the design pattern here. It seems like we want
>> specific usages, but we are packaging it as a generic handler.
>> I think we tried to narrow down on the specific errors we want to handle,
>> but it feels a little clunky as we have a generic thing for two specific
>> errors.
>> 
>> I'm wondering if we are using the right patterns to solve these problems. I
>> agree though that we will need something more than the error classes I'm
>> proposing if we want to have different handling be configurable.
>> My concern is that the open-endedness of a handler means that we are
>> creating more problems than we are solving. It is still unclear to me how
>> we expect to handle the errors. Perhaps we could include an example? It
>> seems like there is a specific use case in mind and maybe we can make a
>> design that is tighter and supports that case.
>> 
>> Justine
>> 
>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> wrote:
>> 
>>> Hi Alieh,
>>> 
>>> Thanks for the KIP!
>>> 
>>> A few questions:
>>> 
>>> K1. What is the expected behavior for the producer if it generates a
>>> RecordTooLargeException, but the handler returns RETRY?
>>> K2. How do we determine which Record was responsible for the
>>> UnknownTopicOrPartitionException since we get that response when
>> sending  a
>>> batch of records?
>>> K3. What is the expected behavior if the handle() method itself throws an
>>> error?
>>> K4. What is the downside of adding an onError() method to the Producer’s
>>> Callback interface vs. a new mechanism?
>>> K5. Can we change “ProducerExceptionHandlerResponse" to just “Response”
>>> given that it’s an inner enum?
>>> K6. Any recommendation for callback authors to handle different behavior
>>> for different topics?
>>> 
>>> I’ll echo what others have said, it would help me understand why we want
>>> another handler class if there were more examples in the Motivation
>>> section. As it stands now, I agree with Chris that the stated issues
>> could
>>> be solved by adding two new configuration options:
>>> 
>>>    oversized.record.behavior=fail
>>>    retry.on.unknown.topic.or.partition=true
>>> 
>>> What I’m not yet able to wrap my head around is: what exactly would the
>>> logic in the handler be? I’m not very imaginative, so I’m assuming they’d
>>> mostly be if-this-then-that. However, if they’re more complicated, I’d
>> have
>>> other concerns.
>>> 
>>> Thanks,
>>> Kirk
>>> 
>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi <asae...@confluent.io.INVALID
>>> 
>>> wrote:
>>>> 
>>>> Thank you all for the feedback!
>>>> 
>>>> Addressing the main concern: The KIP is about giving the user the
>> ability
>>>> to handle producer exceptions, but to be more conservative and avoid
>>> future
>>>> issues, we decided to be limited to a short list of exceptions. I
>>> included
>>>> *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open
>> to
>>>> suggestion for adding some more ;-)
>>>> 
>>>> KIP Updates:
>>>> - clarified the way that the user should configure the Producer to use
>>> the
>>>> custom handler. I think adding a producer config property is the
>> cleanest
>>>> one.
>>>> - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
>>> be
>>>> closer to what we are changing.
>>>> - added the ProducerRecord as the input parameter of the handle()
>> method
>>> as
>>>> well.
>>>> - increased the response types to 3 to have fail and two types of
>>> continue.
>>>> - The default behaviour is having no custom handler, having the
>>>> corresponding config parameter set to null. Therefore, the KIP provides
>>> no
>>>> default implementation of the interface.
>>>> - We follow the interface solution as described in the
>>>> Rejected Alternetives section.
>>>> 
>>>> 
>>>> Cheers,
>>>> Alieh
>>>> 
>>>> 
>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>> 
>>>>> Thanks for the KIP Alieh! It addresses an important case for error
>>>>> handling.
>>>>> 
>>>>> I agree that using this handler would be an expert API, as mentioned
>> by
>>>>> a few people. But I don't think it would be a reason to not add it.
>> It's
>>>>> always a tricky tradeoff what to expose to users and to avoid foot
>> guns,
>>>>> but we added similar handlers to Kafka Streams, and have good
>> experience
>>>>> with it. Hence, I understand, but don't share the concern raised.
>>>>> 
>>>>> I also agree that there is some responsibility by the user to
>> understand
>>>>> how such a handler should be implemented to not drop data by accident.
>>>>> But it seem unavoidable and acceptable.
>>>>> 
>>>>> While I understand that a "simpler / reduced" API (eg via configs)
>> might
>>>>> also work, I personally prefer a full handler. Configs have the same
>>>>> issue that they could be miss-used potentially leading to incorrectly
>>>>> dropped data, but at the same time are less flexible (and thus maybe
>>>>> ever harder to use correctly...?). Base on my experience, there is
>> also
>>>>> often weird corner case for which it make sense to also drop records
>> for
>>>>> other exceptions, and a full handler has the advantage of full
>>>>> flexibility and "absolute power!".
>>>>> 
>>>>> To be fair: I don't know the exact code paths of the producer in
>>>>> details, so please keep me honest. But my understanding is, that the
>> KIP
>>>>> aims to allow users to react to internal exception, and decide to keep
>>>>> retrying internally, swallow the error and drop the record, or raise
>> the
>>>>> error?
>>>>> 
>>>>> Maybe the KIP would need to be a little bit more precises what error
>> we
>>>>> want to cover -- I don't think this list must be exhaustive, as we can
>>>>> always do follow up KIP to also apply the handler to other errors to
>>>>> expand the scope of the handler. The KIP does mention examples, but it
>>>>> might be good to explicitly state for what cases the handler gets
>>> applied?
>>>>> 
>>>>> I am also not sure if CONTINUE and FAIL are enough options? Don't we
>>>>> need three options? Or would `CONTINUE` have different meaning
>> depending
>>>>> on the type of error? Ie, for a retryable error `CONTINUE` would mean
>>>>> keep retrying internally, but for a non-retryable error `CONTINUE`
>> means
>>>>> swallow the error and drop the record? This semantic overload seems
>>>>> tricky to reason about by users, so it might better to split
>> `CONTINUE`
>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better names).
>>>>> 
>>>>> Additionally, should we just ship a `DefaultClientExceptionHandler`
>>>>> which would return `FAIL`, for backward compatibility. Or don't have
>> any
>>>>> default handler to begin with and allow it to be `null`? I don't see
>> the
>>>>> need for a specific `TransactionExceptionHandler`. To me, the goal
>>>>> should be to not modify the default behavior at all, but to just allow
>>>>> users to change the default behavior if there is a need.
>>>>> 
>>>>> What is missing on the KIP though it, how the handler is passed into
>> the
>>>>> producer thought? Would we need a new config which allows to set a
>>>>> custom handler? And/or would we allow to pass in an instance via the
>>>>> constructor or add a new method to set a handler?
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
>>>>>> Hi Alieh,
>>>>>> Thanks for the KIP.
>>>>>> 
>>>>>> Exception handling in the Kafka producer and consumer is really not
>>>>> ideal.
>>>>>> It’s even harder working out what’s going on with the consumer.
>>>>>> 
>>>>>> I’m a bit nervous about this KIP and I agree with Chris that it could
>>> do
>>>>> with additional
>>>>>> motivation. This would be an expert-level interface given how
>>> complicated
>>>>>> the exception handling for Kafka has become.
>>>>>> 
>>>>>> 7. The application is not really aware of the batching being done on
>>> its
>>>>> behalf.
>>>>>> The ProduceResponse can actually return an array of records which
>>> failed
>>>>>> per batch. If you get RecordTooLargeException, and want to retry, you
>>>>> probably
>>>>>> need to remove the offending records from the batch and retry it.
>> This
>>>>> is getting fiddly.
>>>>>> 
>>>>>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether
>>> an
>>>>>> alternative might be to add a method to the existing Callback
>>> interface,
>>>>> such as:
>>>>>> 
>>>>>>  ClientExceptionResponse onException(Exception exception)
>>>>>> 
>>>>>> It would be called when a ProduceResponse contains an error, but the
>>>>>> producer is going to retry. It tells the producer whether to go ahead
>>>>> with the retry
>>>>>> or not. The default implementation would be to CONTINUE, because
>> that’s
>>>>>> just continuing to retry as planned. Note that this is a per-record
>>>>> callback, so
>>>>>> the application would be able to understand which records failed.
>>>>>> 
>>>>>> By using an existing interface, we already know how to configure it
>> and
>>>>> we know
>>>>>> about the threading model for calling it.
>>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID>
>>>>> wrote:
>>>>>>> 
>>>>>>> Hi Alieh,
>>>>>>> 
>>>>>>> Thanks for the KIP! The issue with writing to non-existent topics is
>>>>>>> particularly frustrating for users of Kafka Connect and has been the
>>>>> source
>>>>>>> of a handful of Jira tickets over the years. My thoughts:
>>>>>>> 
>>>>>>> 1. An additional detail we can add to the motivation (or possibly
>>>>> rejected
>>>>>>> alternatives) section is that this kind of custom retry logic can't
>> be
>>>>>>> implemented by hand by, e.g., setting retries to 0 in the producer
>>>>> config
>>>>>>> and handling exceptions at the application level. Or rather, it can,
>>>>> but 1)
>>>>>>> it's a bit painful to have to reimplement at every call-site for
>>>>>>> Producer::send (and any code that awaits the returned Future) and 2)
>>>>> it's
>>>>>>> impossible to do this without losing idempotency on retries.
>>>>>>> 
>>>>>>> 2. That said, I wonder if a pluggable interface is really the right
>>> call
>>>>>>> here. Documenting the interactions of a producer with
>>>>>>> a ClientExceptionHandler instance will be tricky, and implementing
>>> them
>>>>>>> will also be a fair amount of work. I believe that there needs to be
>>>>> some
>>>>>>> more granularity for how writes to non-existent topics (or really,
>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are
>>>>> handled,
>>>>>>> but I'm torn between keeping it simple with maybe one or two new
>>>>> producer
>>>>>>> config properties, or a full-blown pluggable interface. If there are
>>>>> more
>>>>>>> cases that would benefit from a pluggable interface, it would be
>> nice
>>> to
>>>>>>> identify these and add them to the KIP to strengthen the motivation.
>>>>> Right
>>>>>>> now, I'm not sure the two that are mentioned in the motivation are
>>>>>>> sufficient.
>>>>>>> 
>>>>>>> 3. Alternatively, a possible compromise is for this KIP to introduce
>>> new
>>>>>>> properties that dictate how to handle unknown-topic-partition and
>>>>>>> record-too-large errors, with the thinking that if we introduce a
>>>>> pluggable
>>>>>>> interface later on, these properties will be recognized by the
>> default
>>>>>>> implementation of that interface but could be completely ignored or
>>>>>>> replaced by alternative implementations.
>>>>>>> 
>>>>>>> 4. (Nit) You can remove the "This page is meant as a template for
>>>>> writing a
>>>>>>> KIP..." part from the KIP. It's not a template anymore :)
>>>>>>> 
>>>>>>> 5. If we do go the pluggable interface route, wouldn't we want to
>> add
>>>>> the
>>>>>>> possibility for retry logic? The simplest version of this could be
>> to
>>>>> add a
>>>>>>> RETRY value to the ClientExceptionHandlerResponse enum.
>>>>>>> 
>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE"
>> for
>>>>>>> the ClientExceptionHandlerResponse enum, since they cause records to
>>> be
>>>>>>> dropped.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Chris
>>>>>>> 
>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
>>>>>>> <jols...@confluent.io.invalid> wrote:
>>>>>>> 
>>>>>>>> Hey Alieh,
>>>>>>>> 
>>>>>>>> I echo what Omnia says, I'm not sure I understand the implications
>> of
>>>>> the
>>>>>>>> change and I think more detail is needed.
>>>>>>>> 
>>>>>>>> This comment also confused me a bit:
>>>>>>>> * {@code ClientExceptionHandler} that continues the transaction
>> even
>>>>> if a
>>>>>>>> record is too large.
>>>>>>>> * Otherwise, it makes the transaction to fail.
>>>>>>>> 
>>>>>>>> Relatedly, I've been working with some folks on a KIP for
>>> transactions
>>>>>>>> errors and how they are handled. Specifically for the
>>>>>>>> RecordTooLargeException (and a few other errors), we want to give a
>>> new
>>>>>>>> error category for this error that allows the application to choose
>>>>> how it
>>>>>>>> is handled. Maybe this KIP is something that you are looking for?
>>> Stay
>>>>>>>> tuned :)
>>>>>>>> 
>>>>>>>> Justine
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
>>> o.g.h.ibra...@gmail.com
>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Alieh,
>>>>>>>>> Thanks for the KIP! I have couple of comments
>>>>>>>>> - You mentioned in the KIP motivation,
>>>>>>>>>> Another example for which a production exception handler could be
>>>>>>>> useful
>>>>>>>>> is if a user tries to write into a non-existing topic, which
>>> returns a
>>>>>>>>> retryable error code; with infinite retries, the producer would
>> hang
>>>>>>>>> retrying forever. A handler could help to break the infinite retry
>>>>> loop.
>>>>>>>>> 
>>>>>>>>> How the handler can differentiate between something that is
>>> temporary
>>>>> and
>>>>>>>>> it should keep retrying and something permanent like forgot to
>>> create
>>>>> the
>>>>>>>>> topic? temporary here could be
>>>>>>>>> the producer get deployed before the topic creation finish
>>> (specially
>>>>> if
>>>>>>>>> the topic creation is handled via IaC)
>>>>>>>>> temporary offline partitions
>>>>>>>>> leadership changing
>>>>>>>>>       Isn’t this putting the producer at risk of dropping records
>>>>>>>>> unintentionally?
>>>>>>>>> - Can you elaborate more on what is written in the compatibility /
>>>>>>>>> migration plan section please by explaining in bit more details
>> what
>>>>> is
>>>>>>>> the
>>>>>>>>> changing behaviour and how this will impact client who are
>>> upgrading?
>>>>>>>>> - In the proposal changes can you elaborate in the KIP where in
>> the
>>>>>>>>> producer lifecycle will ClientExceptionHandler and
>>>>>>>>> TransactionExceptionHandler get triggered, and how will the
>> producer
>>>>>>>>> configure them to point to costumed implementation.
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Omnia
>>>>>>>>> 
>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
>>> <asae...@confluent.io.INVALID
>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi all,
>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer.
>>>>>>>>>> <
>>>>>>>>> 
>>>>>>>> 
>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
>>>>>>>>>> 
>>>>>>>>>> I look forward to your feedback!
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Alieh
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 

Reply via email to