Alieh,

Having run into issues with not being able to handle producer failures, I
think this is good functionality to have.

With this new functionality proposed at the Producer level, how would
ecosystems that sit on top of it function? Specifically, Connect was
updated a few years ago to allow Source Connect Workers to handle producer
exceptions that would never succeed when the source data was bad.

Knowles

On Tue, Apr 23, 2024 at 5:23 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

> Thanks Matthias. I changed it to `custom.exception.handler`
>
> Alieh
>
>
> On Tue, Apr 23, 2024 at 8:47 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks Alieh!
> >
> > A few nits:
> >
> >
> > 1) The new config we add for the producer should be mentioned in the
> > "Public Interfaces" section.
> >
> > 2) Why do we use `producer.` prefix for a *producer* config? Should it
> > be `exception.handler` only?
> >
> >
> > -Matthias
> >
> > On 4/22/24 7:38 AM, Alieh Saeedi wrote:
> > > Thank you all for the feedback!
> > >
> > > Addressing the main concern: The KIP is about giving the user the
> ability
> > > to handle producer exceptions, but to be more conservative and avoid
> > future
> > > issues, we decided to be limited to a short list of exceptions. I
> > included
> > > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open
> to
> > > suggestion for adding some more ;-)
> > >
> > > KIP Updates:
> > > - clarified the way that the user should configure the Producer to use
> > the
> > > custom handler. I think adding a producer config property is the
> cleanest
> > > one.
> > > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
> > be
> > > closer to what we are changing.
> > > - added the ProducerRecord as the input parameter of the handle()
> method
> > as
> > > well.
> > > - increased the response types to 3 to have fail and two types of
> > continue.
> > > - The default behaviour is having no custom handler, having the
> > > corresponding config parameter set to null. Therefore, the KIP provides
> > no
> > > default implementation of the interface.
> > > - We follow the interface solution as described in the
> > > Rejected Alternetives section.
> > >
> > >
> > > Cheers,
> > > Alieh
> > >
> > >
> > > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Thanks for the KIP Alieh! It addresses an important case for error
> > >> handling.
> > >>
> > >> I agree that using this handler would be an expert API, as mentioned
> by
> > >> a few people. But I don't think it would be a reason to not add it.
> It's
> > >> always a tricky tradeoff what to expose to users and to avoid foot
> guns,
> > >> but we added similar handlers to Kafka Streams, and have good
> experience
> > >> with it. Hence, I understand, but don't share the concern raised.
> > >>
> > >> I also agree that there is some responsibility by the user to
> understand
> > >> how such a handler should be implemented to not drop data by accident.
> > >> But it seem unavoidable and acceptable.
> > >>
> > >> While I understand that a "simpler / reduced" API (eg via configs)
> might
> > >> also work, I personally prefer a full handler. Configs have the same
> > >> issue that they could be miss-used potentially leading to incorrectly
> > >> dropped data, but at the same time are less flexible (and thus maybe
> > >> ever harder to use correctly...?). Base on my experience, there is
> also
> > >> often weird corner case for which it make sense to also drop records
> for
> > >> other exceptions, and a full handler has the advantage of full
> > >> flexibility and "absolute power!".
> > >>
> > >> To be fair: I don't know the exact code paths of the producer in
> > >> details, so please keep me honest. But my understanding is, that the
> KIP
> > >> aims to allow users to react to internal exception, and decide to keep
> > >> retrying internally, swallow the error and drop the record, or raise
> the
> > >> error?
> > >>
> > >> Maybe the KIP would need to be a little bit more precises what error
> we
> > >> want to cover -- I don't think this list must be exhaustive, as we can
> > >> always do follow up KIP to also apply the handler to other errors to
> > >> expand the scope of the handler. The KIP does mention examples, but it
> > >> might be good to explicitly state for what cases the handler gets
> > applied?
> > >>
> > >> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> > >> need three options? Or would `CONTINUE` have different meaning
> depending
> > >> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> > >> keep retrying internally, but for a non-retryable error `CONTINUE`
> means
> > >> swallow the error and drop the record? This semantic overload seems
> > >> tricky to reason about by users, so it might better to split
> `CONTINUE`
> > >> into two cases -> `RETRY` and `SWALLOW` (or some better names).
> > >>
> > >> Additionally, should we just ship a `DefaultClientExceptionHandler`
> > >> which would return `FAIL`, for backward compatibility. Or don't have
> any
> > >> default handler to begin with and allow it to be `null`? I don't see
> the
> > >> need for a specific `TransactionExceptionHandler`. To me, the goal
> > >> should be to not modify the default behavior at all, but to just allow
> > >> users to change the default behavior if there is a need.
> > >>
> > >> What is missing on the KIP though it, how the handler is passed into
> the
> > >> producer thought? Would we need a new config which allows to set a
> > >> custom handler? And/or would we allow to pass in an instance via the
> > >> constructor or add a new method to set a handler?
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > >>> Hi Alieh,
> > >>> Thanks for the KIP.
> > >>>
> > >>> Exception handling in the Kafka producer and consumer is really not
> > >> ideal.
> > >>> It’s even harder working out what’s going on with the consumer.
> > >>>
> > >>> I’m a bit nervous about this KIP and I agree with Chris that it could
> > do
> > >> with additional
> > >>> motivation. This would be an expert-level interface given how
> > complicated
> > >>> the exception handling for Kafka has become.
> > >>>
> > >>> 7. The application is not really aware of the batching being done on
> > its
> > >> behalf.
> > >>> The ProduceResponse can actually return an array of records which
> > failed
> > >>> per batch. If you get RecordTooLargeException, and want to retry, you
> > >> probably
> > >>> need to remove the offending records from the batch and retry it.
> This
> > >> is getting fiddly.
> > >>>
> > >>> 8. There is already o.a.k.clients.producer.Callback. I wonder whether
> > an
> > >>> alternative might be to add a method to the existing Callback
> > interface,
> > >> such as:
> > >>>
> > >>>     ClientExceptionResponse onException(Exception exception)
> > >>>
> > >>> It would be called when a ProduceResponse contains an error, but the
> > >>> producer is going to retry. It tells the producer whether to go ahead
> > >> with the retry
> > >>> or not. The default implementation would be to CONTINUE, because
> that’s
> > >>> just continuing to retry as planned. Note that this is a per-record
> > >> callback, so
> > >>> the application would be able to understand which records failed.
> > >>>
> > >>> By using an existing interface, we already know how to configure it
> and
> > >> we know
> > >>> about the threading model for calling it.
> > >>>
> > >>>
> > >>> Thanks,
> > >>> Andrew
> > >>>
> > >>>
> > >>>
> > >>>> On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID>
> > >> wrote:
> > >>>>
> > >>>> Hi Alieh,
> > >>>>
> > >>>> Thanks for the KIP! The issue with writing to non-existent topics is
> > >>>> particularly frustrating for users of Kafka Connect and has been the
> > >> source
> > >>>> of a handful of Jira tickets over the years. My thoughts:
> > >>>>
> > >>>> 1. An additional detail we can add to the motivation (or possibly
> > >> rejected
> > >>>> alternatives) section is that this kind of custom retry logic can't
> be
> > >>>> implemented by hand by, e.g., setting retries to 0 in the producer
> > >> config
> > >>>> and handling exceptions at the application level. Or rather, it can,
> > >> but 1)
> > >>>> it's a bit painful to have to reimplement at every call-site for
> > >>>> Producer::send (and any code that awaits the returned Future) and 2)
> > >> it's
> > >>>> impossible to do this without losing idempotency on retries.
> > >>>>
> > >>>> 2. That said, I wonder if a pluggable interface is really the right
> > call
> > >>>> here. Documenting the interactions of a producer with
> > >>>> a ClientExceptionHandler instance will be tricky, and implementing
> > them
> > >>>> will also be a fair amount of work. I believe that there needs to be
> > >> some
> > >>>> more granularity for how writes to non-existent topics (or really,
> > >>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are
> > >> handled,
> > >>>> but I'm torn between keeping it simple with maybe one or two new
> > >> producer
> > >>>> config properties, or a full-blown pluggable interface. If there are
> > >> more
> > >>>> cases that would benefit from a pluggable interface, it would be
> nice
> > to
> > >>>> identify these and add them to the KIP to strengthen the motivation.
> > >> Right
> > >>>> now, I'm not sure the two that are mentioned in the motivation are
> > >>>> sufficient.
> > >>>>
> > >>>> 3. Alternatively, a possible compromise is for this KIP to introduce
> > new
> > >>>> properties that dictate how to handle unknown-topic-partition and
> > >>>> record-too-large errors, with the thinking that if we introduce a
> > >> pluggable
> > >>>> interface later on, these properties will be recognized by the
> default
> > >>>> implementation of that interface but could be completely ignored or
> > >>>> replaced by alternative implementations.
> > >>>>
> > >>>> 4. (Nit) You can remove the "This page is meant as a template for
> > >> writing a
> > >>>> KIP..." part from the KIP. It's not a template anymore :)
> > >>>>
> > >>>> 5. If we do go the pluggable interface route, wouldn't we want to
> add
> > >> the
> > >>>> possibility for retry logic? The simplest version of this could be
> to
> > >> add a
> > >>>> RETRY value to the ClientExceptionHandlerResponse enum.
> > >>>>
> > >>>> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE"
> for
> > >>>> the ClientExceptionHandlerResponse enum, since they cause records to
> > be
> > >>>> dropped.
> > >>>>
> > >>>> Cheers,
> > >>>>
> > >>>> Chris
> > >>>>
> > >>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > >>>> <jols...@confluent.io.invalid> wrote:
> > >>>>
> > >>>>> Hey Alieh,
> > >>>>>
> > >>>>> I echo what Omnia says, I'm not sure I understand the implications
> of
> > >> the
> > >>>>> change and I think more detail is needed.
> > >>>>>
> > >>>>> This comment also confused me a bit:
> > >>>>> * {@code ClientExceptionHandler} that continues the transaction
> even
> > >> if a
> > >>>>> record is too large.
> > >>>>> * Otherwise, it makes the transaction to fail.
> > >>>>>
> > >>>>> Relatedly, I've been working with some folks on a KIP for
> > transactions
> > >>>>> errors and how they are handled. Specifically for the
> > >>>>> RecordTooLargeException (and a few other errors), we want to give a
> > new
> > >>>>> error category for this error that allows the application to choose
> > >> how it
> > >>>>> is handled. Maybe this KIP is something that you are looking for?
> > Stay
> > >>>>> tuned :)
> > >>>>>
> > >>>>> Justine
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > o.g.h.ibra...@gmail.com
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>> Thanks for the KIP! I have couple of comments
> > >>>>>> - You mentioned in the KIP motivation,
> > >>>>>>> Another example for which a production exception handler could be
> > >>>>> useful
> > >>>>>> is if a user tries to write into a non-existing topic, which
> > returns a
> > >>>>>> retryable error code; with infinite retries, the producer would
> hang
> > >>>>>> retrying forever. A handler could help to break the infinite retry
> > >> loop.
> > >>>>>>
> > >>>>>> How the handler can differentiate between something that is
> > temporary
> > >> and
> > >>>>>> it should keep retrying and something permanent like forgot to
> > create
> > >> the
> > >>>>>> topic? temporary here could be
> > >>>>>> the producer get deployed before the topic creation finish
> > (specially
> > >> if
> > >>>>>> the topic creation is handled via IaC)
> > >>>>>> temporary offline partitions
> > >>>>>> leadership changing
> > >>>>>>          Isn’t this putting the producer at risk of dropping
> records
> > >>>>>> unintentionally?
> > >>>>>> - Can you elaborate more on what is written in the compatibility /
> > >>>>>> migration plan section please by explaining in bit more details
> what
> > >> is
> > >>>>> the
> > >>>>>> changing behaviour and how this will impact client who are
> > upgrading?
> > >>>>>> - In the proposal changes can you elaborate in the KIP where in
> the
> > >>>>>> producer lifecycle will ClientExceptionHandler and
> > >>>>>> TransactionExceptionHandler get triggered, and how will the
> producer
> > >>>>>> configure them to point to costumed implementation.
> > >>>>>>
> > >>>>>> Thanks
> > >>>>>> Omnia
> > >>>>>>
> > >>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > <asae...@confluent.io.INVALID
> > >>>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi all,
> > >>>>>>> Here is the KIP-1038: Add Custom Error Handler to Producer.
> > >>>>>>> <
> > >>>>>>
> > >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >>>>>>>
> > >>>>>>> I look forward to your feedback!
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Alieh
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to