Hi,

Thank you, Chris and Justine, for the feedback.


@Chris

1) Flexibility: it has two meanings. The first meaning is the one you
mentioned. We are going to cover more exceptions in the future, but as
Justine mentioned, we must be very conservative about adding more
exceptions. Additionally, flexibility mainly means that the user is able to
develop their own code. As mentioned in the motivation section and the
examples, sometimes the user decides on dropping a record based on the
topic, for example.


2) Defining two separate methods for retriable and non-retriable
exceptions: although the idea is brilliant, the user may still make a
mistake by implementing the wrong method and see a non-expecting behaviour.
For example, he may implement handleRetriable() for RecordTooLargeException
and define SWALLOW for the exception, but in practice, he sees no change in
default behaviour since he implemented the wrong method. I think we can
never reduce the user’s mistakes to 0.



3) Default implementation for Handler: the default behaviour is already
preserved with NO need of implementing any handler or setting the
corresponding config parameter `custom.exception.handler`. What you mean is
actually having a second default, which requires having both interface and
config parameters. About UnknownTopicOrPartitionException: the producer
already offers the config parameter `max.block.ms` which determines the
duration of retrying. The main purpose of the user who needs the handler is
to get the root cause of TimeoutException and handle it in the way he
intends. The KIP explains the necessity of it for KS users.


4) Naming issue: By SWALLOW, we meant actually swallow the error, while
SKIP means skip the record, I think. If it makes sense for more ppl, I can
change it to SKIP


@Justine

1) was addressed by Chris.

2 and 3) The problem is exactly what you mentioned. Currently, there is no
way to handle these issues application-side. Even KS users who implement KS
ProductionExceptionHandler are not able to handle the exceptions as they
intend since the code does not reach the KS interface and breaks somewhere
in producer.

Cheers,
Alieh

On Tue, May 7, 2024 at 8:43 PM Chris Egerton <fearthecel...@gmail.com>
wrote:

> Hi Justine,
>
> The method signatures for the interface are indeed open-ended, but the KIP
> states that its uses will be limited. See the motivation section:
>
> > We believe that the user should be able to develop custom exception
> handlers for managing producer exceptions. On the other hand, this will be
> an expert-level API, and using that may result in strange behaviour in the
> system, making it hard to find the root cause. Therefore, the custom
> handler is currently limited to handling RecordTooLargeException and
> UnknownTopicOrPartitionException.
>
> Cheers,
>
> Chris
>
>
> On Tue, May 7, 2024, 14:37 Justine Olshan <jols...@confluent.io.invalid>
> wrote:
>
> > Hi Alieh,
> >
> > I was out for KSB and then was also sick. :(
> >
> > To your point 1) Chris, I don't think it is limited to two specific
> > scenarios, since the interface accepts a generic Exception e and can be
> > implemented to check if that e is an instanceof any exception. I didn't
> see
> > anywhere that specific errors are enforced. I'm a bit concerned about
> this
> > actually. I'm concerned about the opened-endedness and the contract we
> have
> > with transactions. We are allowing the client to make decisions that are
> > somewhat invisible to the server. As an aside, can we build in log
> messages
> > when the handler decides to skip etc a message. I'm really concerned
> about
> > messages being silently dropped.
> >
> > I do think Chris's point 2) about retriable vs non retriable errors is
> > fair. I'm a bit concerned about skipping a unknown topic or partition
> > exception too early, as there are cases where it can be transient.
> >
> > I'm still a little bit wary of allowing dropping records as part of EOS
> > generally as in many cases, these errors signify an issue with the
> original
> > data. I understand that streams and connect/mirror maker may have reasons
> > they want to progress past these messages, but wondering if there is a
> way
> > that can be done application-side. I'm willing to accept this sort of
> > proposal if we can make it clear that this sort of thing is happening and
> > we limit the blast radius for what we can do.
> >
> > Justine
> >
> > On Tue, May 7, 2024 at 9:55 AM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Alieh,
> > >
> > > Sorry for the delay, I've been out sick. I still have some thoughts
> that
> > > I'd like to see addressed before voting.
> > >
> > > 1) If flexibility is the motivation for a pluggable interface, why are
> we
> > > only limiting the uses for this interface to two very specific
> scenarios?
> > > Why not also allow, e.g., authorization errors to be handled as well
> > > (allowing users to drop records destined for some off-limits topics, or
> > > retry for a limited duration in case there's a delay in the propagation
> > of
> > > ACL updates)? It'd be nice to see some analysis of other errors that
> > could
> > > be handled with this new API, both to avoid the follow-up work of
> another
> > > KIP to address them in the future, and to make sure that we're not
> > painting
> > > ourselves into a corner with the current API in a way that would make
> > > future modifications difficult.
> > >
> > > 2) Something feels a bit off with how retriable vs. non-retriable
> errors
> > > are handled with the interface. Why not introduce two separate methods
> to
> > > handle each case separately? That way there's no ambiguity or implicit
> > > behavior when, e.g., attempting to retry on a RecordTooLargeException.
> > This
> > > could be something like `NonRetriableResponse handle(ProducerRecord,
> > > Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
> > > Exception)`, though the exact names and shape can obviously be toyed
> > with a
> > > bit.
> > >
> > > 3) Although the flexibility of a pluggable interface may benefit some
> > > users' custom producer applications and Kafka Streams applications, it
> > > comes at significant deployment cost for other low-/no-code
> environments,
> > > including but not limited to Kafka Connect and MirrorMaker 2. Can we
> add
> > a
> > > default implementation of the exception handler that allows for some
> > simple
> > > behavior to be tweaked via configuration property? Two things that
> would
> > be
> > > nice to have would be A) an upper bound on the retry time for
> > > unknown-topic-partition exceptions and B) an option to drop records
> that
> > > are large enough to trigger a record-too-large exception.
> > >
> > > 4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
> > > "SWALLOW" option, which IMO is opaque and non-obvious, especially when
> > > trying to guess the behavior for retriable errors.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > <asae...@confluent.io.invalid
> > > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > A summary of the KIP and the discussions:
> > > >
> > > >
> > > > The KIP introduces a handler interface for Producer in order to
> handle
> > > two
> > > > exceptions: RecordTooLargeException and
> > UnknownTopicOrPartitionException.
> > > > The handler handles the exceptions per-record.
> > > >
> > > >
> > > > - Do we need this handler?  [Motivation and Examples sections]
> > > >
> > > >
> > > > RecordTooLargeException: 1) In transactions, the producer collects
> > > multiple
> > > > records in batches. Then a RecordTooLargeException related to a
> single
> > > > record leads to failing the entire batch. A custom exception handler
> in
> > > > this case may decide on dropping the record and continuing the
> > > processing.
> > > > See Example 1, please. 2) More over, in Kafka Streams, a record that
> is
> > > too
> > > > large is a poison pill record, and there is no way to skip over it. A
> > > > handler would allow us to react to this error inside the producer,
> > i.e.,
> > > > local to where the error happens, and thus simplify the overall code
> > > > significantly. Please read the Motivation section for more
> explanation.
> > > >
> > > >
> > > > UnknownTopicOrPartitionException: For this case, the producer handles
> > > this
> > > > exception internally and only issues a WARN log about missing
> metadata
> > > and
> > > > retries internally. Later, when the producer hits "
> deliver.timeout.ms"
> > > it
> > > > throws a TimeoutException, and the user can only blindly retry, which
> > may
> > > > result in an infinite retry loop. The thrown TimeoutException "cuts"
> > the
> > > > connection to the underlying root cause of missing metadata (which
> > could
> > > > indeed be a transient error but is persistent for a non-existing
> > topic).
> > > > Thus, there is no programmatic way to break the infinite retry loop.
> > > Kafka
> > > > Streams also blindly retries for this case, and the application gets
> > > stuck.
> > > >
> > > >
> > > >
> > > > - Having interface vs configuration option: [Motivation, Examples,
> and
> > > > Rejected Alternatives sections]
> > > >
> > > > Our solution is introducing an interface due to the full flexibility
> > that
> > > > it offers. Sometimes users, especially Kafka Streams ones, determine
> > the
> > > > handler's behaviour based on the situation. For example, f
> > > > acing UnknownTopicOrPartitionException*, *the user may want to raise
> an
> > > > error for some topics but retry it for other topics. Having a
> > > configuration
> > > > option with a fixed set of possibilities does not serve the user's
> > > > needs. See Example 2, please.
> > > >
> > > >
> > > > - Note on RecordTooLargeException: [Public Interfaces section]
> > > >
> > > > If the custom handler decides on SWALLOW for RecordTooLargeException,
> > > then
> > > > this record will not be a part of the batch of transactions and will
> > also
> > > > not be sent to the broker in non-transactional mode. So no worries
> > about
> > > > getting a RecordTooLargeException from the broker in this case, as
> the
> > > > record will never ever be sent to the broker. SWALLOW means drop the
> > > record
> > > > and continue/swallow the error.
> > > >
> > > >
> > > > - What if the handle() method implements RETRY for
> > > RecordTooLargeException?
> > > > [Proposed Changes section]
> > > >
> > > > We have to limit the user to only have FAIL or SWALLOW for
> > > > RecordTooLargeException. Actually, RETRY must be equal to FAIL. This
> is
> > > > well documented/informed in javadoc.
> > > >
> > > >
> > > >
> > > > - What if the handle() method of the handler throws an exception?
> > > >
> > > > The handler is expected to have correct code. If it throws an
> > exception,
> > > > everything fails.
> > > >
> > > >
> > > >
> > > > This is a PoC PR <https://github.com/apache/kafka/pull/15846> ONLY
> for
> > > > RecordTooLargeException. The code changes related to
> > > > UnknownTopicOrPartitionException will be added to this PR LATER.
> > > >
> > > >
> > > > Looking forward to your feedback again.
> > > >
> > > >
> > > > Cheers,
> > > >
> > > > Alieh
> > > >
> > > > On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro>
> wrote:
> > > >
> > > > > Hi Alieh,
> > > > >
> > > > > Thanks for the updates!
> > > > >
> > > > > Comments inline...
> > > > >
> > > > >
> > > > > > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > > <asae...@confluent.io.INVALID
> > > > >
> > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Thanks a lot for the constructive feedbacks!
> > > > > >
> > > > > >
> > > > > >
> > > > > > Addressing some of the main concerns:
> > > > > >
> > > > > >
> > > > > > - The `RecordTooLargeException` can be thrown by broker, producer
> > and
> > > > > > consumer. Of course, the `ProducerExceptionHandler` interface is
> > > > > introduced
> > > > > > to affect only the exceptions thrown from the producer. This KIP
> > very
> > > > > > specifically means to provide a possibility to manage the
> > > > > > `RecordTooLargeException` thrown from the Producer.send() method.
> > > > Please
> > > > > > see “Proposed Changes” section for more clarity. I investigated
> the
> > > > issue
> > > > > > there thoroughly. I hope it can explain the concern about how we
> > > handle
> > > > > the
> > > > > > errors as well.
> > > > > >
> > > > > >
> > > > > >
> > > > > > - The problem with Callback: Methods of Callback are called when
> > the
> > > > > record
> > > > > > sent to the server is acknowledged, while this is not the desired
> > > time
> > > > > for
> > > > > > all exceptions. We intend to handle exceptions beforehand.
> > > > >
> > > > > I guess it makes sense to keep the expectation for when Callback is
> > > > > invoked as-is vs. shoehorning more into it.
> > > > >
> > > > > > - What if the custom handler returns RETRY for
> > > > > `RecordTooLargeException`? I
> > > > > > assume changing the producer configuration at runtime is
> possible.
> > If
> > > > so,
> > > > > > RETRY for a too large record is valid because maybe in the next
> > try,
> > > > the
> > > > > > too large record is not poisoning any more. I am not 100% sure
> > about
> > > > the
> > > > > > technical details, though. Otherwise, we can consider the RETRY
> as
> > > FAIL
> > > > > for
> > > > > > this exception. Another solution would be to consider a constant
> > > number
> > > > > of
> > > > > > times for RETRY which can be useful for other exceptions as well.
> > > > >
> > > > > It’s not presently possible to change the configuration of an
> > existing
> > > > > Producer at runtime. So if a record hits a RecordTooLargeException
> > > once,
> > > > no
> > > > > amount of retrying (with the current Producer) will change that
> fact.
> > > So
> > > > > I’m still a little stuck on how to handle a response of RETRY for
> an
> > > > > “oversized” record.
> > > > >
> > > > > > - What if the handle() method itself throws an exception? I think
> > > > > > rationally and pragmatically, the behaviour must be exactly like
> > when
> > > > no
> > > > > > custom handler is defined since the user actually did not have a
> > > > working
> > > > > > handler.
> > > > >
> > > > > I’m not convinced that ignoring an errant handler is the right
> > choice.
> > > It
> > > > > then becomes a silent failure that might have repercussions,
> > depending
> > > on
> > > > > the business logic. A user would have to proactively trawls through
> > the
> > > > > logs for WARN/ERROR messages to catch it.
> > > > >
> > > > > Throwing a hard error is pretty draconian, though…
> > > > >
> > > > > > - Why not use config parameters instead of an interface? As
> > explained
> > > > in
> > > > > > the “Rejected Alternatives” section, we assume that the handler
> > will
> > > be
> > > > > > used for a greater number of exceptions in the future. Defining a
> > > > > > configuration parameter for each exception may make the
> > > configuration a
> > > > > bit
> > > > > > messy. Moreover, the handler offers more flexibility.
> > > > >
> > > > > Agreed that the logic-via-configuration approach is weird and
> > limiting.
> > > > > Forget I ever suggested it ;)
> > > > >
> > > > > I’d think additional background in the Motivation section would
> help
> > me
> > > > > understand how users might use this feature beyond a) skipping
> > > > “oversized”
> > > > > records, and b) not retrying missing topics.
> > > > >
> > > > > > Small change:
> > > > > >
> > > > > > -ProductionExceptionHandlerResponse -> Response for brevity and
> > > > > simplicity.
> > > > > > Could’ve been HandlerResponse too I think!
> > > > >
> > > > > The name change sounds good to me.
> > > > >
> > > > > Thanks Alieh!
> > > > >
> > > > > >
> > > > > >
> > > > > > I thank you all again for your useful questions/suggestions.
> > > > > >
> > > > > > I would be happy to hear more of your concerns, as stated in some
> > > > > feedback.
> > > > > >
> > > > > > Cheers,
> > > > > > Alieh
> > > > > >
> > > > > > On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > >
> > > > > >> Thanks Alieh for the updates.
> > > > > >>
> > > > > >> I'm a little concerned about the design pattern here. It seems
> > like
> > > we
> > > > > want
> > > > > >> specific usages, but we are packaging it as a generic handler.
> > > > > >> I think we tried to narrow down on the specific errors we want
> to
> > > > > handle,
> > > > > >> but it feels a little clunky as we have a generic thing for two
> > > > specific
> > > > > >> errors.
> > > > > >>
> > > > > >> I'm wondering if we are using the right patterns to solve these
> > > > > problems. I
> > > > > >> agree though that we will need something more than the error
> > classes
> > > > I'm
> > > > > >> proposing if we want to have different handling be configurable.
> > > > > >> My concern is that the open-endedness of a handler means that we
> > are
> > > > > >> creating more problems than we are solving. It is still unclear
> to
> > > me
> > > > > how
> > > > > >> we expect to handle the errors. Perhaps we could include an
> > example?
> > > > It
> > > > > >> seems like there is a specific use case in mind and maybe we can
> > > make
> > > > a
> > > > > >> design that is tighter and supports that case.
> > > > > >>
> > > > > >> Justine
> > > > > >>
> > > > > >> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro>
> > > wrote:
> > > > > >>
> > > > > >>> Hi Alieh,
> > > > > >>>
> > > > > >>> Thanks for the KIP!
> > > > > >>>
> > > > > >>> A few questions:
> > > > > >>>
> > > > > >>> K1. What is the expected behavior for the producer if it
> > generates
> > > a
> > > > > >>> RecordTooLargeException, but the handler returns RETRY?
> > > > > >>> K2. How do we determine which Record was responsible for the
> > > > > >>> UnknownTopicOrPartitionException since we get that response
> when
> > > > > >> sending  a
> > > > > >>> batch of records?
> > > > > >>> K3. What is the expected behavior if the handle() method itself
> > > > throws
> > > > > an
> > > > > >>> error?
> > > > > >>> K4. What is the downside of adding an onError() method to the
> > > > > Producer’s
> > > > > >>> Callback interface vs. a new mechanism?
> > > > > >>> K5. Can we change “ProducerExceptionHandlerResponse" to just
> > > > “Response”
> > > > > >>> given that it’s an inner enum?
> > > > > >>> K6. Any recommendation for callback authors to handle different
> > > > > behavior
> > > > > >>> for different topics?
> > > > > >>>
> > > > > >>> I’ll echo what others have said, it would help me understand
> why
> > we
> > > > > want
> > > > > >>> another handler class if there were more examples in the
> > Motivation
> > > > > >>> section. As it stands now, I agree with Chris that the stated
> > > issues
> > > > > >> could
> > > > > >>> be solved by adding two new configuration options:
> > > > > >>>
> > > > > >>>    oversized.record.behavior=fail
> > > > > >>>    retry.on.unknown.topic.or.partition=true
> > > > > >>>
> > > > > >>> What I’m not yet able to wrap my head around is: what exactly
> > would
> > > > the
> > > > > >>> logic in the handler be? I’m not very imaginative, so I’m
> > assuming
> > > > > they’d
> > > > > >>> mostly be if-this-then-that. However, if they’re more
> > complicated,
> > > > I’d
> > > > > >> have
> > > > > >>> other concerns.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Kirk
> > > > > >>>
> > > > > >>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> > > > > <asae...@confluent.io.INVALID
> > > > > >>>
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>> Thank you all for the feedback!
> > > > > >>>>
> > > > > >>>> Addressing the main concern: The KIP is about giving the user
> > the
> > > > > >> ability
> > > > > >>>> to handle producer exceptions, but to be more conservative and
> > > avoid
> > > > > >>> future
> > > > > >>>> issues, we decided to be limited to a short list of
> exceptions.
> > I
> > > > > >>> included
> > > > > >>>> *RecordTooLargeExceptin* and
> *UnknownTopicOrPartitionException.
> > > > *Open
> > > > > >> to
> > > > > >>>> suggestion for adding some more ;-)
> > > > > >>>>
> > > > > >>>> KIP Updates:
> > > > > >>>> - clarified the way that the user should configure the
> Producer
> > to
> > > > use
> > > > > >>> the
> > > > > >>>> custom handler. I think adding a producer config property is
> the
> > > > > >> cleanest
> > > > > >>>> one.
> > > > > >>>> - changed the *ClientExceptionHandler* to
> > > *ProducerExceptionHandler*
> > > > > to
> > > > > >>> be
> > > > > >>>> closer to what we are changing.
> > > > > >>>> - added the ProducerRecord as the input parameter of the
> > handle()
> > > > > >> method
> > > > > >>> as
> > > > > >>>> well.
> > > > > >>>> - increased the response types to 3 to have fail and two types
> > of
> > > > > >>> continue.
> > > > > >>>> - The default behaviour is having no custom handler, having
> the
> > > > > >>>> corresponding config parameter set to null. Therefore, the KIP
> > > > > provides
> > > > > >>> no
> > > > > >>>> default implementation of the interface.
> > > > > >>>> - We follow the interface solution as described in the
> > > > > >>>> Rejected Alternetives section.
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Cheers,
> > > > > >>>> Alieh
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
> > mj...@apache.org
> > > >
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>>> Thanks for the KIP Alieh! It addresses an important case for
> > > error
> > > > > >>>>> handling.
> > > > > >>>>>
> > > > > >>>>> I agree that using this handler would be an expert API, as
> > > > mentioned
> > > > > >> by
> > > > > >>>>> a few people. But I don't think it would be a reason to not
> add
> > > it.
> > > > > >> It's
> > > > > >>>>> always a tricky tradeoff what to expose to users and to avoid
> > > foot
> > > > > >> guns,
> > > > > >>>>> but we added similar handlers to Kafka Streams, and have good
> > > > > >> experience
> > > > > >>>>> with it. Hence, I understand, but don't share the concern
> > raised.
> > > > > >>>>>
> > > > > >>>>> I also agree that there is some responsibility by the user to
> > > > > >> understand
> > > > > >>>>> how such a handler should be implemented to not drop data by
> > > > > accident.
> > > > > >>>>> But it seem unavoidable and acceptable.
> > > > > >>>>>
> > > > > >>>>> While I understand that a "simpler / reduced" API (eg via
> > > configs)
> > > > > >> might
> > > > > >>>>> also work, I personally prefer a full handler. Configs have
> the
> > > > same
> > > > > >>>>> issue that they could be miss-used potentially leading to
> > > > incorrectly
> > > > > >>>>> dropped data, but at the same time are less flexible (and
> thus
> > > > maybe
> > > > > >>>>> ever harder to use correctly...?). Base on my experience,
> there
> > > is
> > > > > >> also
> > > > > >>>>> often weird corner case for which it make sense to also drop
> > > > records
> > > > > >> for
> > > > > >>>>> other exceptions, and a full handler has the advantage of
> full
> > > > > >>>>> flexibility and "absolute power!".
> > > > > >>>>>
> > > > > >>>>> To be fair: I don't know the exact code paths of the producer
> > in
> > > > > >>>>> details, so please keep me honest. But my understanding is,
> > that
> > > > the
> > > > > >> KIP
> > > > > >>>>> aims to allow users to react to internal exception, and
> decide
> > to
> > > > > keep
> > > > > >>>>> retrying internally, swallow the error and drop the record,
> or
> > > > raise
> > > > > >> the
> > > > > >>>>> error?
> > > > > >>>>>
> > > > > >>>>> Maybe the KIP would need to be a little bit more precises
> what
> > > > error
> > > > > >> we
> > > > > >>>>> want to cover -- I don't think this list must be exhaustive,
> as
> > > we
> > > > > can
> > > > > >>>>> always do follow up KIP to also apply the handler to other
> > errors
> > > > to
> > > > > >>>>> expand the scope of the handler. The KIP does mention
> examples,
> > > but
> > > > > it
> > > > > >>>>> might be good to explicitly state for what cases the handler
> > gets
> > > > > >>> applied?
> > > > > >>>>>
> > > > > >>>>> I am also not sure if CONTINUE and FAIL are enough options?
> > Don't
> > > > we
> > > > > >>>>> need three options? Or would `CONTINUE` have different
> meaning
> > > > > >> depending
> > > > > >>>>> on the type of error? Ie, for a retryable error `CONTINUE`
> > would
> > > > mean
> > > > > >>>>> keep retrying internally, but for a non-retryable error
> > > `CONTINUE`
> > > > > >> means
> > > > > >>>>> swallow the error and drop the record? This semantic overload
> > > seems
> > > > > >>>>> tricky to reason about by users, so it might better to split
> > > > > >> `CONTINUE`
> > > > > >>>>> into two cases -> `RETRY` and `SWALLOW` (or some better
> names).
> > > > > >>>>>
> > > > > >>>>> Additionally, should we just ship a
> > > `DefaultClientExceptionHandler`
> > > > > >>>>> which would return `FAIL`, for backward compatibility. Or
> don't
> > > > have
> > > > > >> any
> > > > > >>>>> default handler to begin with and allow it to be `null`? I
> > don't
> > > > see
> > > > > >> the
> > > > > >>>>> need for a specific `TransactionExceptionHandler`. To me, the
> > > goal
> > > > > >>>>> should be to not modify the default behavior at all, but to
> > just
> > > > > allow
> > > > > >>>>> users to change the default behavior if there is a need.
> > > > > >>>>>
> > > > > >>>>> What is missing on the KIP though it, how the handler is
> passed
> > > > into
> > > > > >> the
> > > > > >>>>> producer thought? Would we need a new config which allows to
> > set
> > > a
> > > > > >>>>> custom handler? And/or would we allow to pass in an instance
> > via
> > > > the
> > > > > >>>>> constructor or add a new method to set a handler?
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> -Matthias
> > > > > >>>>>
> > > > > >>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > > > > >>>>>> Hi Alieh,
> > > > > >>>>>> Thanks for the KIP.
> > > > > >>>>>>
> > > > > >>>>>> Exception handling in the Kafka producer and consumer is
> > really
> > > > not
> > > > > >>>>> ideal.
> > > > > >>>>>> It’s even harder working out what’s going on with the
> > consumer.
> > > > > >>>>>>
> > > > > >>>>>> I’m a bit nervous about this KIP and I agree with Chris that
> > it
> > > > > could
> > > > > >>> do
> > > > > >>>>> with additional
> > > > > >>>>>> motivation. This would be an expert-level interface given
> how
> > > > > >>> complicated
> > > > > >>>>>> the exception handling for Kafka has become.
> > > > > >>>>>>
> > > > > >>>>>> 7. The application is not really aware of the batching being
> > > done
> > > > on
> > > > > >>> its
> > > > > >>>>> behalf.
> > > > > >>>>>> The ProduceResponse can actually return an array of records
> > > which
> > > > > >>> failed
> > > > > >>>>>> per batch. If you get RecordTooLargeException, and want to
> > > retry,
> > > > > you
> > > > > >>>>> probably
> > > > > >>>>>> need to remove the offending records from the batch and
> retry
> > > it.
> > > > > >> This
> > > > > >>>>> is getting fiddly.
> > > > > >>>>>>
> > > > > >>>>>> 8. There is already o.a.k.clients.producer.Callback. I
> wonder
> > > > > whether
> > > > > >>> an
> > > > > >>>>>> alternative might be to add a method to the existing
> Callback
> > > > > >>> interface,
> > > > > >>>>> such as:
> > > > > >>>>>>
> > > > > >>>>>>  ClientExceptionResponse onException(Exception exception)
> > > > > >>>>>>
> > > > > >>>>>> It would be called when a ProduceResponse contains an error,
> > but
> > > > the
> > > > > >>>>>> producer is going to retry. It tells the producer whether to
> > go
> > > > > ahead
> > > > > >>>>> with the retry
> > > > > >>>>>> or not. The default implementation would be to CONTINUE,
> > because
> > > > > >> that’s
> > > > > >>>>>> just continuing to retry as planned. Note that this is a
> > > > per-record
> > > > > >>>>> callback, so
> > > > > >>>>>> the application would be able to understand which records
> > > failed.
> > > > > >>>>>>
> > > > > >>>>>> By using an existing interface, we already know how to
> > configure
> > > > it
> > > > > >> and
> > > > > >>>>> we know
> > > > > >>>>>> about the threading model for calling it.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Andrew
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
> > > <chr...@aiven.io.INVALID
> > > > >
> > > > > >>>>> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> Hi Alieh,
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks for the KIP! The issue with writing to non-existent
> > > topics
> > > > > is
> > > > > >>>>>>> particularly frustrating for users of Kafka Connect and has
> > > been
> > > > > the
> > > > > >>>>> source
> > > > > >>>>>>> of a handful of Jira tickets over the years. My thoughts:
> > > > > >>>>>>>
> > > > > >>>>>>> 1. An additional detail we can add to the motivation (or
> > > possibly
> > > > > >>>>> rejected
> > > > > >>>>>>> alternatives) section is that this kind of custom retry
> logic
> > > > can't
> > > > > >> be
> > > > > >>>>>>> implemented by hand by, e.g., setting retries to 0 in the
> > > > producer
> > > > > >>>>> config
> > > > > >>>>>>> and handling exceptions at the application level. Or
> rather,
> > it
> > > > > can,
> > > > > >>>>> but 1)
> > > > > >>>>>>> it's a bit painful to have to reimplement at every
> call-site
> > > for
> > > > > >>>>>>> Producer::send (and any code that awaits the returned
> Future)
> > > and
> > > > > 2)
> > > > > >>>>> it's
> > > > > >>>>>>> impossible to do this without losing idempotency on
> retries.
> > > > > >>>>>>>
> > > > > >>>>>>> 2. That said, I wonder if a pluggable interface is really
> the
> > > > right
> > > > > >>> call
> > > > > >>>>>>> here. Documenting the interactions of a producer with
> > > > > >>>>>>> a ClientExceptionHandler instance will be tricky, and
> > > > implementing
> > > > > >>> them
> > > > > >>>>>>> will also be a fair amount of work. I believe that there
> > needs
> > > to
> > > > > be
> > > > > >>>>> some
> > > > > >>>>>>> more granularity for how writes to non-existent topics (or
> > > > really,
> > > > > >>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the
> > broker)
> > > > are
> > > > > >>>>> handled,
> > > > > >>>>>>> but I'm torn between keeping it simple with maybe one or
> two
> > > new
> > > > > >>>>> producer
> > > > > >>>>>>> config properties, or a full-blown pluggable interface. If
> > > there
> > > > > are
> > > > > >>>>> more
> > > > > >>>>>>> cases that would benefit from a pluggable interface, it
> would
> > > be
> > > > > >> nice
> > > > > >>> to
> > > > > >>>>>>> identify these and add them to the KIP to strengthen the
> > > > > motivation.
> > > > > >>>>> Right
> > > > > >>>>>>> now, I'm not sure the two that are mentioned in the
> > motivation
> > > > are
> > > > > >>>>>>> sufficient.
> > > > > >>>>>>>
> > > > > >>>>>>> 3. Alternatively, a possible compromise is for this KIP to
> > > > > introduce
> > > > > >>> new
> > > > > >>>>>>> properties that dictate how to handle
> unknown-topic-partition
> > > and
> > > > > >>>>>>> record-too-large errors, with the thinking that if we
> > > introduce a
> > > > > >>>>> pluggable
> > > > > >>>>>>> interface later on, these properties will be recognized by
> > the
> > > > > >> default
> > > > > >>>>>>> implementation of that interface but could be completely
> > > ignored
> > > > or
> > > > > >>>>>>> replaced by alternative implementations.
> > > > > >>>>>>>
> > > > > >>>>>>> 4. (Nit) You can remove the "This page is meant as a
> template
> > > for
> > > > > >>>>> writing a
> > > > > >>>>>>> KIP..." part from the KIP. It's not a template anymore :)
> > > > > >>>>>>>
> > > > > >>>>>>> 5. If we do go the pluggable interface route, wouldn't we
> > want
> > > to
> > > > > >> add
> > > > > >>>>> the
> > > > > >>>>>>> possibility for retry logic? The simplest version of this
> > could
> > > > be
> > > > > >> to
> > > > > >>>>> add a
> > > > > >>>>>>> RETRY value to the ClientExceptionHandlerResponse enum.
> > > > > >>>>>>>
> > > > > >>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of
> > > > "CONTINUE"
> > > > > >> for
> > > > > >>>>>>> the ClientExceptionHandlerResponse enum, since they cause
> > > records
> > > > > to
> > > > > >>> be
> > > > > >>>>>>> dropped.
> > > > > >>>>>>>
> > > > > >>>>>>> Cheers,
> > > > > >>>>>>>
> > > > > >>>>>>> Chris
> > > > > >>>>>>>
> > > > > >>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > > > > >>>>>>> <jols...@confluent.io.invalid> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hey Alieh,
> > > > > >>>>>>>>
> > > > > >>>>>>>> I echo what Omnia says, I'm not sure I understand the
> > > > implications
> > > > > >> of
> > > > > >>>>> the
> > > > > >>>>>>>> change and I think more detail is needed.
> > > > > >>>>>>>>
> > > > > >>>>>>>> This comment also confused me a bit:
> > > > > >>>>>>>> * {@code ClientExceptionHandler} that continues the
> > > transaction
> > > > > >> even
> > > > > >>>>> if a
> > > > > >>>>>>>> record is too large.
> > > > > >>>>>>>> * Otherwise, it makes the transaction to fail.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Relatedly, I've been working with some folks on a KIP for
> > > > > >>> transactions
> > > > > >>>>>>>> errors and how they are handled. Specifically for the
> > > > > >>>>>>>> RecordTooLargeException (and a few other errors), we want
> to
> > > > give
> > > > > a
> > > > > >>> new
> > > > > >>>>>>>> error category for this error that allows the application
> to
> > > > > choose
> > > > > >>>>> how it
> > > > > >>>>>>>> is handled. Maybe this KIP is something that you are
> looking
> > > > for?
> > > > > >>> Stay
> > > > > >>>>>>>> tuned :)
> > > > > >>>>>>>>
> > > > > >>>>>>>> Justine
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > > > > >>> o.g.h.ibra...@gmail.com
> > > > > >>>>>>
> > > > > >>>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Hi Alieh,
> > > > > >>>>>>>>> Thanks for the KIP! I have couple of comments
> > > > > >>>>>>>>> - You mentioned in the KIP motivation,
> > > > > >>>>>>>>>> Another example for which a production exception handler
> > > could
> > > > > be
> > > > > >>>>>>>> useful
> > > > > >>>>>>>>> is if a user tries to write into a non-existing topic,
> > which
> > > > > >>> returns a
> > > > > >>>>>>>>> retryable error code; with infinite retries, the producer
> > > would
> > > > > >> hang
> > > > > >>>>>>>>> retrying forever. A handler could help to break the
> > infinite
> > > > > retry
> > > > > >>>>> loop.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> How the handler can differentiate between something that
> is
> > > > > >>> temporary
> > > > > >>>>> and
> > > > > >>>>>>>>> it should keep retrying and something permanent like
> forgot
> > > to
> > > > > >>> create
> > > > > >>>>> the
> > > > > >>>>>>>>> topic? temporary here could be
> > > > > >>>>>>>>> the producer get deployed before the topic creation
> finish
> > > > > >>> (specially
> > > > > >>>>> if
> > > > > >>>>>>>>> the topic creation is handled via IaC)
> > > > > >>>>>>>>> temporary offline partitions
> > > > > >>>>>>>>> leadership changing
> > > > > >>>>>>>>>       Isn’t this putting the producer at risk of dropping
> > > > records
> > > > > >>>>>>>>> unintentionally?
> > > > > >>>>>>>>> - Can you elaborate more on what is written in the
> > > > compatibility
> > > > > /
> > > > > >>>>>>>>> migration plan section please by explaining in bit more
> > > details
> > > > > >> what
> > > > > >>>>> is
> > > > > >>>>>>>> the
> > > > > >>>>>>>>> changing behaviour and how this will impact client who
> are
> > > > > >>> upgrading?
> > > > > >>>>>>>>> - In the proposal changes can you elaborate in the KIP
> > where
> > > in
> > > > > >> the
> > > > > >>>>>>>>> producer lifecycle will ClientExceptionHandler and
> > > > > >>>>>>>>> TransactionExceptionHandler get triggered, and how will
> the
> > > > > >> producer
> > > > > >>>>>>>>> configure them to point to costumed implementation.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> Thanks
> > > > > >>>>>>>>> Omnia
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > > > > >>> <asae...@confluent.io.INVALID
> > > > > >>>>>>
> > > > > >>>>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Hi all,
> > > > > >>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
> > Producer.
> > > > > >>>>>>>>>> <
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I look forward to your feedback!
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Cheers,
> > > > > >>>>>>>>>> Alieh
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to