Hi all,

A summary of the KIP and the discussions:


The KIP introduces a handler interface for Producer in order to handle two
exceptions: RecordTooLargeException and UnknownTopicOrPartitionException.
The handler handles the exceptions per-record.


- Do we need this handler?  [Motivation and Examples sections]


RecordTooLargeException: 1) In transactions, the producer collects multiple
records in batches. Then a RecordTooLargeException related to a single
record leads to failing the entire batch. A custom exception handler in
this case may decide on dropping the record and continuing the processing.
See Example 1, please. 2) More over, in Kafka Streams, a record that is too
large is a poison pill record, and there is no way to skip over it. A
handler would allow us to react to this error inside the producer, i.e.,
local to where the error happens, and thus simplify the overall code
significantly. Please read the Motivation section for more explanation.


UnknownTopicOrPartitionException: For this case, the producer handles this
exception internally and only issues a WARN log about missing metadata and
retries internally. Later, when the producer hits "deliver.timeout.ms"  it
throws a TimeoutException, and the user can only blindly retry, which may
result in an infinite retry loop. The thrown TimeoutException "cuts" the
connection to the underlying root cause of missing metadata (which could
indeed be a transient error but is persistent for a non-existing topic).
Thus, there is no programmatic way to break the infinite retry loop. Kafka
Streams also blindly retries for this case, and the application gets stuck.



- Having interface vs configuration option: [Motivation, Examples, and
Rejected Alternatives sections]

Our solution is introducing an interface due to the full flexibility that
it offers. Sometimes users, especially Kafka Streams ones, determine the
handler's behaviour based on the situation. For example, f
acing UnknownTopicOrPartitionException*, *the user may want to raise an
error for some topics but retry it for other topics. Having a configuration
option with a fixed set of possibilities does not serve the user's
needs. See Example 2, please.


- Note on RecordTooLargeException: [Public Interfaces section]

If the custom handler decides on SWALLOW for RecordTooLargeException, then
this record will not be a part of the batch of transactions and will also
not be sent to the broker in non-transactional mode. So no worries about
getting a RecordTooLargeException from the broker in this case, as the
record will never ever be sent to the broker. SWALLOW means drop the record
and continue/swallow the error.


- What if the handle() method implements RETRY for RecordTooLargeException?
[Proposed Changes section]

We have to limit the user to only have FAIL or SWALLOW for
RecordTooLargeException. Actually, RETRY must be equal to FAIL. This is
well documented/informed in javadoc.



- What if the handle() method of the handler throws an exception?

The handler is expected to have correct code. If it throws an exception,
everything fails.



This is a PoC PR <https://github.com/apache/kafka/pull/15846> ONLY for
RecordTooLargeException. The code changes related to
UnknownTopicOrPartitionException will be added to this PR LATER.


Looking forward to your feedback again.


Cheers,

Alieh

On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro> wrote:

> Hi Alieh,
>
> Thanks for the updates!
>
> Comments inline...
>
>
> > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi <asae...@confluent.io.INVALID>
> wrote:
> >
> > Hi all,
> >
> > Thanks a lot for the constructive feedbacks!
> >
> >
> >
> > Addressing some of the main concerns:
> >
> >
> > - The `RecordTooLargeException` can be thrown by broker, producer and
> > consumer. Of course, the `ProducerExceptionHandler` interface is
> introduced
> > to affect only the exceptions thrown from the producer. This KIP very
> > specifically means to provide a possibility to manage the
> > `RecordTooLargeException` thrown from the Producer.send() method. Please
> > see “Proposed Changes” section for more clarity. I investigated the issue
> > there thoroughly. I hope it can explain the concern about how we handle
> the
> > errors as well.
> >
> >
> >
> > - The problem with Callback: Methods of Callback are called when the
> record
> > sent to the server is acknowledged, while this is not the desired time
> for
> > all exceptions. We intend to handle exceptions beforehand.
>
> I guess it makes sense to keep the expectation for when Callback is
> invoked as-is vs. shoehorning more into it.
>
> > - What if the custom handler returns RETRY for
> `RecordTooLargeException`? I
> > assume changing the producer configuration at runtime is possible. If so,
> > RETRY for a too large record is valid because maybe in the next try, the
> > too large record is not poisoning any more. I am not 100% sure about the
> > technical details, though. Otherwise, we can consider the RETRY as FAIL
> for
> > this exception. Another solution would be to consider a constant number
> of
> > times for RETRY which can be useful for other exceptions as well.
>
> It’s not presently possible to change the configuration of an existing
> Producer at runtime. So if a record hits a RecordTooLargeException once, no
> amount of retrying (with the current Producer) will change that fact. So
> I’m still a little stuck on how to handle a response of RETRY for an
> “oversized” record.
>
> > - What if the handle() method itself throws an exception? I think
> > rationally and pragmatically, the behaviour must be exactly like when no
> > custom handler is defined since the user actually did not have a working
> > handler.
>
> I’m not convinced that ignoring an errant handler is the right choice. It
> then becomes a silent failure that might have repercussions, depending on
> the business logic. A user would have to proactively trawls through the
> logs for WARN/ERROR messages to catch it.
>
> Throwing a hard error is pretty draconian, though…
>
> > - Why not use config parameters instead of an interface? As explained in
> > the “Rejected Alternatives” section, we assume that the handler will be
> > used for a greater number of exceptions in the future. Defining a
> > configuration parameter for each exception may make the configuration a
> bit
> > messy. Moreover, the handler offers more flexibility.
>
> Agreed that the logic-via-configuration approach is weird and limiting.
> Forget I ever suggested it ;)
>
> I’d think additional background in the Motivation section would help me
> understand how users might use this feature beyond a) skipping “oversized”
> records, and b) not retrying missing topics.
>
> > Small change:
> >
> > -ProductionExceptionHandlerResponse -> Response for brevity and
> simplicity.
> > Could’ve been HandlerResponse too I think!
>
> The name change sounds good to me.
>
> Thanks Alieh!
>
> >
> >
> > I thank you all again for your useful questions/suggestions.
> >
> > I would be happy to hear more of your concerns, as stated in some
> feedback.
> >
> > Cheers,
> > Alieh
> >
> > On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > <jols...@confluent.io.invalid> wrote:
> >
> >> Thanks Alieh for the updates.
> >>
> >> I'm a little concerned about the design pattern here. It seems like we
> want
> >> specific usages, but we are packaging it as a generic handler.
> >> I think we tried to narrow down on the specific errors we want to
> handle,
> >> but it feels a little clunky as we have a generic thing for two specific
> >> errors.
> >>
> >> I'm wondering if we are using the right patterns to solve these
> problems. I
> >> agree though that we will need something more than the error classes I'm
> >> proposing if we want to have different handling be configurable.
> >> My concern is that the open-endedness of a handler means that we are
> >> creating more problems than we are solving. It is still unclear to me
> how
> >> we expect to handle the errors. Perhaps we could include an example? It
> >> seems like there is a specific use case in mind and maybe we can make a
> >> design that is tighter and supports that case.
> >>
> >> Justine
> >>
> >> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> wrote:
> >>
> >>> Hi Alieh,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>> A few questions:
> >>>
> >>> K1. What is the expected behavior for the producer if it generates a
> >>> RecordTooLargeException, but the handler returns RETRY?
> >>> K2. How do we determine which Record was responsible for the
> >>> UnknownTopicOrPartitionException since we get that response when
> >> sending  a
> >>> batch of records?
> >>> K3. What is the expected behavior if the handle() method itself throws
> an
> >>> error?
> >>> K4. What is the downside of adding an onError() method to the
> Producer’s
> >>> Callback interface vs. a new mechanism?
> >>> K5. Can we change “ProducerExceptionHandlerResponse" to just “Response”
> >>> given that it’s an inner enum?
> >>> K6. Any recommendation for callback authors to handle different
> behavior
> >>> for different topics?
> >>>
> >>> I’ll echo what others have said, it would help me understand why we
> want
> >>> another handler class if there were more examples in the Motivation
> >>> section. As it stands now, I agree with Chris that the stated issues
> >> could
> >>> be solved by adding two new configuration options:
> >>>
> >>>    oversized.record.behavior=fail
> >>>    retry.on.unknown.topic.or.partition=true
> >>>
> >>> What I’m not yet able to wrap my head around is: what exactly would the
> >>> logic in the handler be? I’m not very imaginative, so I’m assuming
> they’d
> >>> mostly be if-this-then-that. However, if they’re more complicated, I’d
> >> have
> >>> other concerns.
> >>>
> >>> Thanks,
> >>> Kirk
> >>>
> >>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> <asae...@confluent.io.INVALID
> >>>
> >>> wrote:
> >>>>
> >>>> Thank you all for the feedback!
> >>>>
> >>>> Addressing the main concern: The KIP is about giving the user the
> >> ability
> >>>> to handle producer exceptions, but to be more conservative and avoid
> >>> future
> >>>> issues, we decided to be limited to a short list of exceptions. I
> >>> included
> >>>> *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open
> >> to
> >>>> suggestion for adding some more ;-)
> >>>>
> >>>> KIP Updates:
> >>>> - clarified the way that the user should configure the Producer to use
> >>> the
> >>>> custom handler. I think adding a producer config property is the
> >> cleanest
> >>>> one.
> >>>> - changed the *ClientExceptionHandler* to *ProducerExceptionHandler*
> to
> >>> be
> >>>> closer to what we are changing.
> >>>> - added the ProducerRecord as the input parameter of the handle()
> >> method
> >>> as
> >>>> well.
> >>>> - increased the response types to 3 to have fail and two types of
> >>> continue.
> >>>> - The default behaviour is having no custom handler, having the
> >>>> corresponding config parameter set to null. Therefore, the KIP
> provides
> >>> no
> >>>> default implementation of the interface.
> >>>> - We follow the interface solution as described in the
> >>>> Rejected Alternetives section.
> >>>>
> >>>>
> >>>> Cheers,
> >>>> Alieh
> >>>>
> >>>>
> >>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Thanks for the KIP Alieh! It addresses an important case for error
> >>>>> handling.
> >>>>>
> >>>>> I agree that using this handler would be an expert API, as mentioned
> >> by
> >>>>> a few people. But I don't think it would be a reason to not add it.
> >> It's
> >>>>> always a tricky tradeoff what to expose to users and to avoid foot
> >> guns,
> >>>>> but we added similar handlers to Kafka Streams, and have good
> >> experience
> >>>>> with it. Hence, I understand, but don't share the concern raised.
> >>>>>
> >>>>> I also agree that there is some responsibility by the user to
> >> understand
> >>>>> how such a handler should be implemented to not drop data by
> accident.
> >>>>> But it seem unavoidable and acceptable.
> >>>>>
> >>>>> While I understand that a "simpler / reduced" API (eg via configs)
> >> might
> >>>>> also work, I personally prefer a full handler. Configs have the same
> >>>>> issue that they could be miss-used potentially leading to incorrectly
> >>>>> dropped data, but at the same time are less flexible (and thus maybe
> >>>>> ever harder to use correctly...?). Base on my experience, there is
> >> also
> >>>>> often weird corner case for which it make sense to also drop records
> >> for
> >>>>> other exceptions, and a full handler has the advantage of full
> >>>>> flexibility and "absolute power!".
> >>>>>
> >>>>> To be fair: I don't know the exact code paths of the producer in
> >>>>> details, so please keep me honest. But my understanding is, that the
> >> KIP
> >>>>> aims to allow users to react to internal exception, and decide to
> keep
> >>>>> retrying internally, swallow the error and drop the record, or raise
> >> the
> >>>>> error?
> >>>>>
> >>>>> Maybe the KIP would need to be a little bit more precises what error
> >> we
> >>>>> want to cover -- I don't think this list must be exhaustive, as we
> can
> >>>>> always do follow up KIP to also apply the handler to other errors to
> >>>>> expand the scope of the handler. The KIP does mention examples, but
> it
> >>>>> might be good to explicitly state for what cases the handler gets
> >>> applied?
> >>>>>
> >>>>> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> >>>>> need three options? Or would `CONTINUE` have different meaning
> >> depending
> >>>>> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> >>>>> keep retrying internally, but for a non-retryable error `CONTINUE`
> >> means
> >>>>> swallow the error and drop the record? This semantic overload seems
> >>>>> tricky to reason about by users, so it might better to split
> >> `CONTINUE`
> >>>>> into two cases -> `RETRY` and `SWALLOW` (or some better names).
> >>>>>
> >>>>> Additionally, should we just ship a `DefaultClientExceptionHandler`
> >>>>> which would return `FAIL`, for backward compatibility. Or don't have
> >> any
> >>>>> default handler to begin with and allow it to be `null`? I don't see
> >> the
> >>>>> need for a specific `TransactionExceptionHandler`. To me, the goal
> >>>>> should be to not modify the default behavior at all, but to just
> allow
> >>>>> users to change the default behavior if there is a need.
> >>>>>
> >>>>> What is missing on the KIP though it, how the handler is passed into
> >> the
> >>>>> producer thought? Would we need a new config which allows to set a
> >>>>> custom handler? And/or would we allow to pass in an instance via the
> >>>>> constructor or add a new method to set a handler?
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> >>>>>> Hi Alieh,
> >>>>>> Thanks for the KIP.
> >>>>>>
> >>>>>> Exception handling in the Kafka producer and consumer is really not
> >>>>> ideal.
> >>>>>> It’s even harder working out what’s going on with the consumer.
> >>>>>>
> >>>>>> I’m a bit nervous about this KIP and I agree with Chris that it
> could
> >>> do
> >>>>> with additional
> >>>>>> motivation. This would be an expert-level interface given how
> >>> complicated
> >>>>>> the exception handling for Kafka has become.
> >>>>>>
> >>>>>> 7. The application is not really aware of the batching being done on
> >>> its
> >>>>> behalf.
> >>>>>> The ProduceResponse can actually return an array of records which
> >>> failed
> >>>>>> per batch. If you get RecordTooLargeException, and want to retry,
> you
> >>>>> probably
> >>>>>> need to remove the offending records from the batch and retry it.
> >> This
> >>>>> is getting fiddly.
> >>>>>>
> >>>>>> 8. There is already o.a.k.clients.producer.Callback. I wonder
> whether
> >>> an
> >>>>>> alternative might be to add a method to the existing Callback
> >>> interface,
> >>>>> such as:
> >>>>>>
> >>>>>>  ClientExceptionResponse onException(Exception exception)
> >>>>>>
> >>>>>> It would be called when a ProduceResponse contains an error, but the
> >>>>>> producer is going to retry. It tells the producer whether to go
> ahead
> >>>>> with the retry
> >>>>>> or not. The default implementation would be to CONTINUE, because
> >> that’s
> >>>>>> just continuing to retry as planned. Note that this is a per-record
> >>>>> callback, so
> >>>>>> the application would be able to understand which records failed.
> >>>>>>
> >>>>>> By using an existing interface, we already know how to configure it
> >> and
> >>>>> we know
> >>>>>> about the threading model for calling it.
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID>
> >>>>> wrote:
> >>>>>>>
> >>>>>>> Hi Alieh,
> >>>>>>>
> >>>>>>> Thanks for the KIP! The issue with writing to non-existent topics
> is
> >>>>>>> particularly frustrating for users of Kafka Connect and has been
> the
> >>>>> source
> >>>>>>> of a handful of Jira tickets over the years. My thoughts:
> >>>>>>>
> >>>>>>> 1. An additional detail we can add to the motivation (or possibly
> >>>>> rejected
> >>>>>>> alternatives) section is that this kind of custom retry logic can't
> >> be
> >>>>>>> implemented by hand by, e.g., setting retries to 0 in the producer
> >>>>> config
> >>>>>>> and handling exceptions at the application level. Or rather, it
> can,
> >>>>> but 1)
> >>>>>>> it's a bit painful to have to reimplement at every call-site for
> >>>>>>> Producer::send (and any code that awaits the returned Future) and
> 2)
> >>>>> it's
> >>>>>>> impossible to do this without losing idempotency on retries.
> >>>>>>>
> >>>>>>> 2. That said, I wonder if a pluggable interface is really the right
> >>> call
> >>>>>>> here. Documenting the interactions of a producer with
> >>>>>>> a ClientExceptionHandler instance will be tricky, and implementing
> >>> them
> >>>>>>> will also be a fair amount of work. I believe that there needs to
> be
> >>>>> some
> >>>>>>> more granularity for how writes to non-existent topics (or really,
> >>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are
> >>>>> handled,
> >>>>>>> but I'm torn between keeping it simple with maybe one or two new
> >>>>> producer
> >>>>>>> config properties, or a full-blown pluggable interface. If there
> are
> >>>>> more
> >>>>>>> cases that would benefit from a pluggable interface, it would be
> >> nice
> >>> to
> >>>>>>> identify these and add them to the KIP to strengthen the
> motivation.
> >>>>> Right
> >>>>>>> now, I'm not sure the two that are mentioned in the motivation are
> >>>>>>> sufficient.
> >>>>>>>
> >>>>>>> 3. Alternatively, a possible compromise is for this KIP to
> introduce
> >>> new
> >>>>>>> properties that dictate how to handle unknown-topic-partition and
> >>>>>>> record-too-large errors, with the thinking that if we introduce a
> >>>>> pluggable
> >>>>>>> interface later on, these properties will be recognized by the
> >> default
> >>>>>>> implementation of that interface but could be completely ignored or
> >>>>>>> replaced by alternative implementations.
> >>>>>>>
> >>>>>>> 4. (Nit) You can remove the "This page is meant as a template for
> >>>>> writing a
> >>>>>>> KIP..." part from the KIP. It's not a template anymore :)
> >>>>>>>
> >>>>>>> 5. If we do go the pluggable interface route, wouldn't we want to
> >> add
> >>>>> the
> >>>>>>> possibility for retry logic? The simplest version of this could be
> >> to
> >>>>> add a
> >>>>>>> RETRY value to the ClientExceptionHandlerResponse enum.
> >>>>>>>
> >>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE"
> >> for
> >>>>>>> the ClientExceptionHandlerResponse enum, since they cause records
> to
> >>> be
> >>>>>>> dropped.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Chris
> >>>>>>>
> >>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> >>>>>>> <jols...@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>>> Hey Alieh,
> >>>>>>>>
> >>>>>>>> I echo what Omnia says, I'm not sure I understand the implications
> >> of
> >>>>> the
> >>>>>>>> change and I think more detail is needed.
> >>>>>>>>
> >>>>>>>> This comment also confused me a bit:
> >>>>>>>> * {@code ClientExceptionHandler} that continues the transaction
> >> even
> >>>>> if a
> >>>>>>>> record is too large.
> >>>>>>>> * Otherwise, it makes the transaction to fail.
> >>>>>>>>
> >>>>>>>> Relatedly, I've been working with some folks on a KIP for
> >>> transactions
> >>>>>>>> errors and how they are handled. Specifically for the
> >>>>>>>> RecordTooLargeException (and a few other errors), we want to give
> a
> >>> new
> >>>>>>>> error category for this error that allows the application to
> choose
> >>>>> how it
> >>>>>>>> is handled. Maybe this KIP is something that you are looking for?
> >>> Stay
> >>>>>>>> tuned :)
> >>>>>>>>
> >>>>>>>> Justine
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> >>> o.g.h.ibra...@gmail.com
> >>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Alieh,
> >>>>>>>>> Thanks for the KIP! I have couple of comments
> >>>>>>>>> - You mentioned in the KIP motivation,
> >>>>>>>>>> Another example for which a production exception handler could
> be
> >>>>>>>> useful
> >>>>>>>>> is if a user tries to write into a non-existing topic, which
> >>> returns a
> >>>>>>>>> retryable error code; with infinite retries, the producer would
> >> hang
> >>>>>>>>> retrying forever. A handler could help to break the infinite
> retry
> >>>>> loop.
> >>>>>>>>>
> >>>>>>>>> How the handler can differentiate between something that is
> >>> temporary
> >>>>> and
> >>>>>>>>> it should keep retrying and something permanent like forgot to
> >>> create
> >>>>> the
> >>>>>>>>> topic? temporary here could be
> >>>>>>>>> the producer get deployed before the topic creation finish
> >>> (specially
> >>>>> if
> >>>>>>>>> the topic creation is handled via IaC)
> >>>>>>>>> temporary offline partitions
> >>>>>>>>> leadership changing
> >>>>>>>>>       Isn’t this putting the producer at risk of dropping records
> >>>>>>>>> unintentionally?
> >>>>>>>>> - Can you elaborate more on what is written in the compatibility
> /
> >>>>>>>>> migration plan section please by explaining in bit more details
> >> what
> >>>>> is
> >>>>>>>> the
> >>>>>>>>> changing behaviour and how this will impact client who are
> >>> upgrading?
> >>>>>>>>> - In the proposal changes can you elaborate in the KIP where in
> >> the
> >>>>>>>>> producer lifecycle will ClientExceptionHandler and
> >>>>>>>>> TransactionExceptionHandler get triggered, and how will the
> >> producer
> >>>>>>>>> configure them to point to costumed implementation.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Omnia
> >>>>>>>>>
> >>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> >>> <asae...@confluent.io.INVALID
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer.
> >>>>>>>>>> <
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> >>>>>>>>>>
> >>>>>>>>>> I look forward to your feedback!
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Alieh
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Reply via email to