Alieh and Chris,

Thanks for clarifying 1) but I saw the motivation. I guess I just didn't
understand how that would be ensured on the producer side. I saw the sample
code -- is it just an if statement checking for the error before the
handler is invoked? That seems a bit fragile.

Can you clarify what you mean by `since the code does not reach the KS
interface and breaks somewhere in producer.` If we surfaced this error to
the application in a better way would that also be a solution to the issue?

Justine

On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

> Hi,
>
>
> Thank you, Chris and Justine, for the feedback.
>
>
> @Chris
>
> 1) Flexibility: it has two meanings. The first meaning is the one you
> mentioned. We are going to cover more exceptions in the future, but as
> Justine mentioned, we must be very conservative about adding more
> exceptions. Additionally, flexibility mainly means that the user is able to
> develop their own code. As mentioned in the motivation section and the
> examples, sometimes the user decides on dropping a record based on the
> topic, for example.
>
>
> 2) Defining two separate methods for retriable and non-retriable
> exceptions: although the idea is brilliant, the user may still make a
> mistake by implementing the wrong method and see a non-expecting behaviour.
> For example, he may implement handleRetriable() for RecordTooLargeException
> and define SWALLOW for the exception, but in practice, he sees no change in
> default behaviour since he implemented the wrong method. I think we can
> never reduce the user’s mistakes to 0.
>
>
>
> 3) Default implementation for Handler: the default behaviour is already
> preserved with NO need of implementing any handler or setting the
> corresponding config parameter `custom.exception.handler`. What you mean is
> actually having a second default, which requires having both interface and
> config parameters. About UnknownTopicOrPartitionException: the producer
> already offers the config parameter `max.block.ms` which determines the
> duration of retrying. The main purpose of the user who needs the handler is
> to get the root cause of TimeoutException and handle it in the way he
> intends. The KIP explains the necessity of it for KS users.
>
>
> 4) Naming issue: By SWALLOW, we meant actually swallow the error, while
> SKIP means skip the record, I think. If it makes sense for more ppl, I can
> change it to SKIP
>
>
> @Justine
>
> 1) was addressed by Chris.
>
> 2 and 3) The problem is exactly what you mentioned. Currently, there is no
> way to handle these issues application-side. Even KS users who implement KS
> ProductionExceptionHandler are not able to handle the exceptions as they
> intend since the code does not reach the KS interface and breaks somewhere
> in producer.
>
> Cheers,
> Alieh
>
> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <fearthecel...@gmail.com>
> wrote:
>
> > Hi Justine,
> >
> > The method signatures for the interface are indeed open-ended, but the
> KIP
> > states that its uses will be limited. See the motivation section:
> >
> > > We believe that the user should be able to develop custom exception
> > handlers for managing producer exceptions. On the other hand, this will
> be
> > an expert-level API, and using that may result in strange behaviour in
> the
> > system, making it hard to find the root cause. Therefore, the custom
> > handler is currently limited to handling RecordTooLargeException and
> > UnknownTopicOrPartitionException.
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Tue, May 7, 2024, 14:37 Justine Olshan <jols...@confluent.io.invalid>
> > wrote:
> >
> > > Hi Alieh,
> > >
> > > I was out for KSB and then was also sick. :(
> > >
> > > To your point 1) Chris, I don't think it is limited to two specific
> > > scenarios, since the interface accepts a generic Exception e and can be
> > > implemented to check if that e is an instanceof any exception. I didn't
> > see
> > > anywhere that specific errors are enforced. I'm a bit concerned about
> > this
> > > actually. I'm concerned about the opened-endedness and the contract we
> > have
> > > with transactions. We are allowing the client to make decisions that
> are
> > > somewhat invisible to the server. As an aside, can we build in log
> > messages
> > > when the handler decides to skip etc a message. I'm really concerned
> > about
> > > messages being silently dropped.
> > >
> > > I do think Chris's point 2) about retriable vs non retriable errors is
> > > fair. I'm a bit concerned about skipping a unknown topic or partition
> > > exception too early, as there are cases where it can be transient.
> > >
> > > I'm still a little bit wary of allowing dropping records as part of EOS
> > > generally as in many cases, these errors signify an issue with the
> > original
> > > data. I understand that streams and connect/mirror maker may have
> reasons
> > > they want to progress past these messages, but wondering if there is a
> > way
> > > that can be done application-side. I'm willing to accept this sort of
> > > proposal if we can make it clear that this sort of thing is happening
> and
> > > we limit the blast radius for what we can do.
> > >
> > > Justine
> > >
> > > On Tue, May 7, 2024 at 9:55 AM Chris Egerton <chr...@aiven.io.invalid>
> > > wrote:
> > >
> > > > Hi Alieh,
> > > >
> > > > Sorry for the delay, I've been out sick. I still have some thoughts
> > that
> > > > I'd like to see addressed before voting.
> > > >
> > > > 1) If flexibility is the motivation for a pluggable interface, why
> are
> > we
> > > > only limiting the uses for this interface to two very specific
> > scenarios?
> > > > Why not also allow, e.g., authorization errors to be handled as well
> > > > (allowing users to drop records destined for some off-limits topics,
> or
> > > > retry for a limited duration in case there's a delay in the
> propagation
> > > of
> > > > ACL updates)? It'd be nice to see some analysis of other errors that
> > > could
> > > > be handled with this new API, both to avoid the follow-up work of
> > another
> > > > KIP to address them in the future, and to make sure that we're not
> > > painting
> > > > ourselves into a corner with the current API in a way that would make
> > > > future modifications difficult.
> > > >
> > > > 2) Something feels a bit off with how retriable vs. non-retriable
> > errors
> > > > are handled with the interface. Why not introduce two separate
> methods
> > to
> > > > handle each case separately? That way there's no ambiguity or
> implicit
> > > > behavior when, e.g., attempting to retry on a
> RecordTooLargeException.
> > > This
> > > > could be something like `NonRetriableResponse handle(ProducerRecord,
> > > > Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
> > > > Exception)`, though the exact names and shape can obviously be toyed
> > > with a
> > > > bit.
> > > >
> > > > 3) Although the flexibility of a pluggable interface may benefit some
> > > > users' custom producer applications and Kafka Streams applications,
> it
> > > > comes at significant deployment cost for other low-/no-code
> > environments,
> > > > including but not limited to Kafka Connect and MirrorMaker 2. Can we
> > add
> > > a
> > > > default implementation of the exception handler that allows for some
> > > simple
> > > > behavior to be tweaked via configuration property? Two things that
> > would
> > > be
> > > > nice to have would be A) an upper bound on the retry time for
> > > > unknown-topic-partition exceptions and B) an option to drop records
> > that
> > > > are large enough to trigger a record-too-large exception.
> > > >
> > > > 4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed
> > > > "SWALLOW" option, which IMO is opaque and non-obvious, especially
> when
> > > > trying to guess the behavior for retriable errors.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > > <asae...@confluent.io.invalid
> > > > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > > A summary of the KIP and the discussions:
> > > > >
> > > > >
> > > > > The KIP introduces a handler interface for Producer in order to
> > handle
> > > > two
> > > > > exceptions: RecordTooLargeException and
> > > UnknownTopicOrPartitionException.
> > > > > The handler handles the exceptions per-record.
> > > > >
> > > > >
> > > > > - Do we need this handler?  [Motivation and Examples sections]
> > > > >
> > > > >
> > > > > RecordTooLargeException: 1) In transactions, the producer collects
> > > > multiple
> > > > > records in batches. Then a RecordTooLargeException related to a
> > single
> > > > > record leads to failing the entire batch. A custom exception
> handler
> > in
> > > > > this case may decide on dropping the record and continuing the
> > > > processing.
> > > > > See Example 1, please. 2) More over, in Kafka Streams, a record
> that
> > is
> > > > too
> > > > > large is a poison pill record, and there is no way to skip over
> it. A
> > > > > handler would allow us to react to this error inside the producer,
> > > i.e.,
> > > > > local to where the error happens, and thus simplify the overall
> code
> > > > > significantly. Please read the Motivation section for more
> > explanation.
> > > > >
> > > > >
> > > > > UnknownTopicOrPartitionException: For this case, the producer
> handles
> > > > this
> > > > > exception internally and only issues a WARN log about missing
> > metadata
> > > > and
> > > > > retries internally. Later, when the producer hits "
> > deliver.timeout.ms"
> > > > it
> > > > > throws a TimeoutException, and the user can only blindly retry,
> which
> > > may
> > > > > result in an infinite retry loop. The thrown TimeoutException
> "cuts"
> > > the
> > > > > connection to the underlying root cause of missing metadata (which
> > > could
> > > > > indeed be a transient error but is persistent for a non-existing
> > > topic).
> > > > > Thus, there is no programmatic way to break the infinite retry
> loop.
> > > > Kafka
> > > > > Streams also blindly retries for this case, and the application
> gets
> > > > stuck.
> > > > >
> > > > >
> > > > >
> > > > > - Having interface vs configuration option: [Motivation, Examples,
> > and
> > > > > Rejected Alternatives sections]
> > > > >
> > > > > Our solution is introducing an interface due to the full
> flexibility
> > > that
> > > > > it offers. Sometimes users, especially Kafka Streams ones,
> determine
> > > the
> > > > > handler's behaviour based on the situation. For example, f
> > > > > acing UnknownTopicOrPartitionException*, *the user may want to
> raise
> > an
> > > > > error for some topics but retry it for other topics. Having a
> > > > configuration
> > > > > option with a fixed set of possibilities does not serve the user's
> > > > > needs. See Example 2, please.
> > > > >
> > > > >
> > > > > - Note on RecordTooLargeException: [Public Interfaces section]
> > > > >
> > > > > If the custom handler decides on SWALLOW for
> RecordTooLargeException,
> > > > then
> > > > > this record will not be a part of the batch of transactions and
> will
> > > also
> > > > > not be sent to the broker in non-transactional mode. So no worries
> > > about
> > > > > getting a RecordTooLargeException from the broker in this case, as
> > the
> > > > > record will never ever be sent to the broker. SWALLOW means drop
> the
> > > > record
> > > > > and continue/swallow the error.
> > > > >
> > > > >
> > > > > - What if the handle() method implements RETRY for
> > > > RecordTooLargeException?
> > > > > [Proposed Changes section]
> > > > >
> > > > > We have to limit the user to only have FAIL or SWALLOW for
> > > > > RecordTooLargeException. Actually, RETRY must be equal to FAIL.
> This
> > is
> > > > > well documented/informed in javadoc.
> > > > >
> > > > >
> > > > >
> > > > > - What if the handle() method of the handler throws an exception?
> > > > >
> > > > > The handler is expected to have correct code. If it throws an
> > > exception,
> > > > > everything fails.
> > > > >
> > > > >
> > > > >
> > > > > This is a PoC PR <https://github.com/apache/kafka/pull/15846> ONLY
> > for
> > > > > RecordTooLargeException. The code changes related to
> > > > > UnknownTopicOrPartitionException will be added to this PR LATER.
> > > > >
> > > > >
> > > > > Looking forward to your feedback again.
> > > > >
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Alieh
> > > > >
> > > > > On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro>
> > wrote:
> > > > >
> > > > > > Hi Alieh,
> > > > > >
> > > > > > Thanks for the updates!
> > > > > >
> > > > > > Comments inline...
> > > > > >
> > > > > >
> > > > > > > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > > > <asae...@confluent.io.INVALID
> > > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Thanks a lot for the constructive feedbacks!
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Addressing some of the main concerns:
> > > > > > >
> > > > > > >
> > > > > > > - The `RecordTooLargeException` can be thrown by broker,
> producer
> > > and
> > > > > > > consumer. Of course, the `ProducerExceptionHandler` interface
> is
> > > > > > introduced
> > > > > > > to affect only the exceptions thrown from the producer. This
> KIP
> > > very
> > > > > > > specifically means to provide a possibility to manage the
> > > > > > > `RecordTooLargeException` thrown from the Producer.send()
> method.
> > > > > Please
> > > > > > > see “Proposed Changes” section for more clarity. I investigated
> > the
> > > > > issue
> > > > > > > there thoroughly. I hope it can explain the concern about how
> we
> > > > handle
> > > > > > the
> > > > > > > errors as well.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > - The problem with Callback: Methods of Callback are called
> when
> > > the
> > > > > > record
> > > > > > > sent to the server is acknowledged, while this is not the
> desired
> > > > time
> > > > > > for
> > > > > > > all exceptions. We intend to handle exceptions beforehand.
> > > > > >
> > > > > > I guess it makes sense to keep the expectation for when Callback
> is
> > > > > > invoked as-is vs. shoehorning more into it.
> > > > > >
> > > > > > > - What if the custom handler returns RETRY for
> > > > > > `RecordTooLargeException`? I
> > > > > > > assume changing the producer configuration at runtime is
> > possible.
> > > If
> > > > > so,
> > > > > > > RETRY for a too large record is valid because maybe in the next
> > > try,
> > > > > the
> > > > > > > too large record is not poisoning any more. I am not 100% sure
> > > about
> > > > > the
> > > > > > > technical details, though. Otherwise, we can consider the RETRY
> > as
> > > > FAIL
> > > > > > for
> > > > > > > this exception. Another solution would be to consider a
> constant
> > > > number
> > > > > > of
> > > > > > > times for RETRY which can be useful for other exceptions as
> well.
> > > > > >
> > > > > > It’s not presently possible to change the configuration of an
> > > existing
> > > > > > Producer at runtime. So if a record hits a
> RecordTooLargeException
> > > > once,
> > > > > no
> > > > > > amount of retrying (with the current Producer) will change that
> > fact.
> > > > So
> > > > > > I’m still a little stuck on how to handle a response of RETRY for
> > an
> > > > > > “oversized” record.
> > > > > >
> > > > > > > - What if the handle() method itself throws an exception? I
> think
> > > > > > > rationally and pragmatically, the behaviour must be exactly
> like
> > > when
> > > > > no
> > > > > > > custom handler is defined since the user actually did not have
> a
> > > > > working
> > > > > > > handler.
> > > > > >
> > > > > > I’m not convinced that ignoring an errant handler is the right
> > > choice.
> > > > It
> > > > > > then becomes a silent failure that might have repercussions,
> > > depending
> > > > on
> > > > > > the business logic. A user would have to proactively trawls
> through
> > > the
> > > > > > logs for WARN/ERROR messages to catch it.
> > > > > >
> > > > > > Throwing a hard error is pretty draconian, though…
> > > > > >
> > > > > > > - Why not use config parameters instead of an interface? As
> > > explained
> > > > > in
> > > > > > > the “Rejected Alternatives” section, we assume that the handler
> > > will
> > > > be
> > > > > > > used for a greater number of exceptions in the future.
> Defining a
> > > > > > > configuration parameter for each exception may make the
> > > > configuration a
> > > > > > bit
> > > > > > > messy. Moreover, the handler offers more flexibility.
> > > > > >
> > > > > > Agreed that the logic-via-configuration approach is weird and
> > > limiting.
> > > > > > Forget I ever suggested it ;)
> > > > > >
> > > > > > I’d think additional background in the Motivation section would
> > help
> > > me
> > > > > > understand how users might use this feature beyond a) skipping
> > > > > “oversized”
> > > > > > records, and b) not retrying missing topics.
> > > > > >
> > > > > > > Small change:
> > > > > > >
> > > > > > > -ProductionExceptionHandlerResponse -> Response for brevity and
> > > > > > simplicity.
> > > > > > > Could’ve been HandlerResponse too I think!
> > > > > >
> > > > > > The name change sounds good to me.
> > > > > >
> > > > > > Thanks Alieh!
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > > I thank you all again for your useful questions/suggestions.
> > > > > > >
> > > > > > > I would be happy to hear more of your concerns, as stated in
> some
> > > > > > feedback.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Alieh
> > > > > > >
> > > > > > > On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > > > > > > <jols...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > >> Thanks Alieh for the updates.
> > > > > > >>
> > > > > > >> I'm a little concerned about the design pattern here. It seems
> > > like
> > > > we
> > > > > > want
> > > > > > >> specific usages, but we are packaging it as a generic handler.
> > > > > > >> I think we tried to narrow down on the specific errors we want
> > to
> > > > > > handle,
> > > > > > >> but it feels a little clunky as we have a generic thing for
> two
> > > > > specific
> > > > > > >> errors.
> > > > > > >>
> > > > > > >> I'm wondering if we are using the right patterns to solve
> these
> > > > > > problems. I
> > > > > > >> agree though that we will need something more than the error
> > > classes
> > > > > I'm
> > > > > > >> proposing if we want to have different handling be
> configurable.
> > > > > > >> My concern is that the open-endedness of a handler means that
> we
> > > are
> > > > > > >> creating more problems than we are solving. It is still
> unclear
> > to
> > > > me
> > > > > > how
> > > > > > >> we expect to handle the errors. Perhaps we could include an
> > > example?
> > > > > It
> > > > > > >> seems like there is a specific use case in mind and maybe we
> can
> > > > make
> > > > > a
> > > > > > >> design that is tighter and supports that case.
> > > > > > >>
> > > > > > >> Justine
> > > > > > >>
> > > > > > >> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro>
> > > > wrote:
> > > > > > >>
> > > > > > >>> Hi Alieh,
> > > > > > >>>
> > > > > > >>> Thanks for the KIP!
> > > > > > >>>
> > > > > > >>> A few questions:
> > > > > > >>>
> > > > > > >>> K1. What is the expected behavior for the producer if it
> > > generates
> > > > a
> > > > > > >>> RecordTooLargeException, but the handler returns RETRY?
> > > > > > >>> K2. How do we determine which Record was responsible for the
> > > > > > >>> UnknownTopicOrPartitionException since we get that response
> > when
> > > > > > >> sending  a
> > > > > > >>> batch of records?
> > > > > > >>> K3. What is the expected behavior if the handle() method
> itself
> > > > > throws
> > > > > > an
> > > > > > >>> error?
> > > > > > >>> K4. What is the downside of adding an onError() method to the
> > > > > > Producer’s
> > > > > > >>> Callback interface vs. a new mechanism?
> > > > > > >>> K5. Can we change “ProducerExceptionHandlerResponse" to just
> > > > > “Response”
> > > > > > >>> given that it’s an inner enum?
> > > > > > >>> K6. Any recommendation for callback authors to handle
> different
> > > > > > behavior
> > > > > > >>> for different topics?
> > > > > > >>>
> > > > > > >>> I’ll echo what others have said, it would help me understand
> > why
> > > we
> > > > > > want
> > > > > > >>> another handler class if there were more examples in the
> > > Motivation
> > > > > > >>> section. As it stands now, I agree with Chris that the stated
> > > > issues
> > > > > > >> could
> > > > > > >>> be solved by adding two new configuration options:
> > > > > > >>>
> > > > > > >>>    oversized.record.behavior=fail
> > > > > > >>>    retry.on.unknown.topic.or.partition=true
> > > > > > >>>
> > > > > > >>> What I’m not yet able to wrap my head around is: what exactly
> > > would
> > > > > the
> > > > > > >>> logic in the handler be? I’m not very imaginative, so I’m
> > > assuming
> > > > > > they’d
> > > > > > >>> mostly be if-this-then-that. However, if they’re more
> > > complicated,
> > > > > I’d
> > > > > > >> have
> > > > > > >>> other concerns.
> > > > > > >>>
> > > > > > >>> Thanks,
> > > > > > >>> Kirk
> > > > > > >>>
> > > > > > >>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> > > > > > <asae...@confluent.io.INVALID
> > > > > > >>>
> > > > > > >>> wrote:
> > > > > > >>>>
> > > > > > >>>> Thank you all for the feedback!
> > > > > > >>>>
> > > > > > >>>> Addressing the main concern: The KIP is about giving the
> user
> > > the
> > > > > > >> ability
> > > > > > >>>> to handle producer exceptions, but to be more conservative
> and
> > > > avoid
> > > > > > >>> future
> > > > > > >>>> issues, we decided to be limited to a short list of
> > exceptions.
> > > I
> > > > > > >>> included
> > > > > > >>>> *RecordTooLargeExceptin* and
> > *UnknownTopicOrPartitionException.
> > > > > *Open
> > > > > > >> to
> > > > > > >>>> suggestion for adding some more ;-)
> > > > > > >>>>
> > > > > > >>>> KIP Updates:
> > > > > > >>>> - clarified the way that the user should configure the
> > Producer
> > > to
> > > > > use
> > > > > > >>> the
> > > > > > >>>> custom handler. I think adding a producer config property is
> > the
> > > > > > >> cleanest
> > > > > > >>>> one.
> > > > > > >>>> - changed the *ClientExceptionHandler* to
> > > > *ProducerExceptionHandler*
> > > > > > to
> > > > > > >>> be
> > > > > > >>>> closer to what we are changing.
> > > > > > >>>> - added the ProducerRecord as the input parameter of the
> > > handle()
> > > > > > >> method
> > > > > > >>> as
> > > > > > >>>> well.
> > > > > > >>>> - increased the response types to 3 to have fail and two
> types
> > > of
> > > > > > >>> continue.
> > > > > > >>>> - The default behaviour is having no custom handler, having
> > the
> > > > > > >>>> corresponding config parameter set to null. Therefore, the
> KIP
> > > > > > provides
> > > > > > >>> no
> > > > > > >>>> default implementation of the interface.
> > > > > > >>>> - We follow the interface solution as described in the
> > > > > > >>>> Rejected Alternetives section.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> Cheers,
> > > > > > >>>> Alieh
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
> > > mj...@apache.org
> > > > >
> > > > > > >>> wrote:
> > > > > > >>>>
> > > > > > >>>>> Thanks for the KIP Alieh! It addresses an important case
> for
> > > > error
> > > > > > >>>>> handling.
> > > > > > >>>>>
> > > > > > >>>>> I agree that using this handler would be an expert API, as
> > > > > mentioned
> > > > > > >> by
> > > > > > >>>>> a few people. But I don't think it would be a reason to not
> > add
> > > > it.
> > > > > > >> It's
> > > > > > >>>>> always a tricky tradeoff what to expose to users and to
> avoid
> > > > foot
> > > > > > >> guns,
> > > > > > >>>>> but we added similar handlers to Kafka Streams, and have
> good
> > > > > > >> experience
> > > > > > >>>>> with it. Hence, I understand, but don't share the concern
> > > raised.
> > > > > > >>>>>
> > > > > > >>>>> I also agree that there is some responsibility by the user
> to
> > > > > > >> understand
> > > > > > >>>>> how such a handler should be implemented to not drop data
> by
> > > > > > accident.
> > > > > > >>>>> But it seem unavoidable and acceptable.
> > > > > > >>>>>
> > > > > > >>>>> While I understand that a "simpler / reduced" API (eg via
> > > > configs)
> > > > > > >> might
> > > > > > >>>>> also work, I personally prefer a full handler. Configs have
> > the
> > > > > same
> > > > > > >>>>> issue that they could be miss-used potentially leading to
> > > > > incorrectly
> > > > > > >>>>> dropped data, but at the same time are less flexible (and
> > thus
> > > > > maybe
> > > > > > >>>>> ever harder to use correctly...?). Base on my experience,
> > there
> > > > is
> > > > > > >> also
> > > > > > >>>>> often weird corner case for which it make sense to also
> drop
> > > > > records
> > > > > > >> for
> > > > > > >>>>> other exceptions, and a full handler has the advantage of
> > full
> > > > > > >>>>> flexibility and "absolute power!".
> > > > > > >>>>>
> > > > > > >>>>> To be fair: I don't know the exact code paths of the
> producer
> > > in
> > > > > > >>>>> details, so please keep me honest. But my understanding is,
> > > that
> > > > > the
> > > > > > >> KIP
> > > > > > >>>>> aims to allow users to react to internal exception, and
> > decide
> > > to
> > > > > > keep
> > > > > > >>>>> retrying internally, swallow the error and drop the record,
> > or
> > > > > raise
> > > > > > >> the
> > > > > > >>>>> error?
> > > > > > >>>>>
> > > > > > >>>>> Maybe the KIP would need to be a little bit more precises
> > what
> > > > > error
> > > > > > >> we
> > > > > > >>>>> want to cover -- I don't think this list must be
> exhaustive,
> > as
> > > > we
> > > > > > can
> > > > > > >>>>> always do follow up KIP to also apply the handler to other
> > > errors
> > > > > to
> > > > > > >>>>> expand the scope of the handler. The KIP does mention
> > examples,
> > > > but
> > > > > > it
> > > > > > >>>>> might be good to explicitly state for what cases the
> handler
> > > gets
> > > > > > >>> applied?
> > > > > > >>>>>
> > > > > > >>>>> I am also not sure if CONTINUE and FAIL are enough options?
> > > Don't
> > > > > we
> > > > > > >>>>> need three options? Or would `CONTINUE` have different
> > meaning
> > > > > > >> depending
> > > > > > >>>>> on the type of error? Ie, for a retryable error `CONTINUE`
> > > would
> > > > > mean
> > > > > > >>>>> keep retrying internally, but for a non-retryable error
> > > > `CONTINUE`
> > > > > > >> means
> > > > > > >>>>> swallow the error and drop the record? This semantic
> overload
> > > > seems
> > > > > > >>>>> tricky to reason about by users, so it might better to
> split
> > > > > > >> `CONTINUE`
> > > > > > >>>>> into two cases -> `RETRY` and `SWALLOW` (or some better
> > names).
> > > > > > >>>>>
> > > > > > >>>>> Additionally, should we just ship a
> > > > `DefaultClientExceptionHandler`
> > > > > > >>>>> which would return `FAIL`, for backward compatibility. Or
> > don't
> > > > > have
> > > > > > >> any
> > > > > > >>>>> default handler to begin with and allow it to be `null`? I
> > > don't
> > > > > see
> > > > > > >> the
> > > > > > >>>>> need for a specific `TransactionExceptionHandler`. To me,
> the
> > > > goal
> > > > > > >>>>> should be to not modify the default behavior at all, but to
> > > just
> > > > > > allow
> > > > > > >>>>> users to change the default behavior if there is a need.
> > > > > > >>>>>
> > > > > > >>>>> What is missing on the KIP though it, how the handler is
> > passed
> > > > > into
> > > > > > >> the
> > > > > > >>>>> producer thought? Would we need a new config which allows
> to
> > > set
> > > > a
> > > > > > >>>>> custom handler? And/or would we allow to pass in an
> instance
> > > via
> > > > > the
> > > > > > >>>>> constructor or add a new method to set a handler?
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> -Matthias
> > > > > > >>>>>
> > > > > > >>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > > > > > >>>>>> Hi Alieh,
> > > > > > >>>>>> Thanks for the KIP.
> > > > > > >>>>>>
> > > > > > >>>>>> Exception handling in the Kafka producer and consumer is
> > > really
> > > > > not
> > > > > > >>>>> ideal.
> > > > > > >>>>>> It’s even harder working out what’s going on with the
> > > consumer.
> > > > > > >>>>>>
> > > > > > >>>>>> I’m a bit nervous about this KIP and I agree with Chris
> that
> > > it
> > > > > > could
> > > > > > >>> do
> > > > > > >>>>> with additional
> > > > > > >>>>>> motivation. This would be an expert-level interface given
> > how
> > > > > > >>> complicated
> > > > > > >>>>>> the exception handling for Kafka has become.
> > > > > > >>>>>>
> > > > > > >>>>>> 7. The application is not really aware of the batching
> being
> > > > done
> > > > > on
> > > > > > >>> its
> > > > > > >>>>> behalf.
> > > > > > >>>>>> The ProduceResponse can actually return an array of
> records
> > > > which
> > > > > > >>> failed
> > > > > > >>>>>> per batch. If you get RecordTooLargeException, and want to
> > > > retry,
> > > > > > you
> > > > > > >>>>> probably
> > > > > > >>>>>> need to remove the offending records from the batch and
> > retry
> > > > it.
> > > > > > >> This
> > > > > > >>>>> is getting fiddly.
> > > > > > >>>>>>
> > > > > > >>>>>> 8. There is already o.a.k.clients.producer.Callback. I
> > wonder
> > > > > > whether
> > > > > > >>> an
> > > > > > >>>>>> alternative might be to add a method to the existing
> > Callback
> > > > > > >>> interface,
> > > > > > >>>>> such as:
> > > > > > >>>>>>
> > > > > > >>>>>>  ClientExceptionResponse onException(Exception exception)
> > > > > > >>>>>>
> > > > > > >>>>>> It would be called when a ProduceResponse contains an
> error,
> > > but
> > > > > the
> > > > > > >>>>>> producer is going to retry. It tells the producer whether
> to
> > > go
> > > > > > ahead
> > > > > > >>>>> with the retry
> > > > > > >>>>>> or not. The default implementation would be to CONTINUE,
> > > because
> > > > > > >> that’s
> > > > > > >>>>>> just continuing to retry as planned. Note that this is a
> > > > > per-record
> > > > > > >>>>> callback, so
> > > > > > >>>>>> the application would be able to understand which records
> > > > failed.
> > > > > > >>>>>>
> > > > > > >>>>>> By using an existing interface, we already know how to
> > > configure
> > > > > it
> > > > > > >> and
> > > > > > >>>>> we know
> > > > > > >>>>>> about the threading model for calling it.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks,
> > > > > > >>>>>> Andrew
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
> > > > <chr...@aiven.io.INVALID
> > > > > >
> > > > > > >>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>> Hi Alieh,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for the KIP! The issue with writing to
> non-existent
> > > > topics
> > > > > > is
> > > > > > >>>>>>> particularly frustrating for users of Kafka Connect and
> has
> > > > been
> > > > > > the
> > > > > > >>>>> source
> > > > > > >>>>>>> of a handful of Jira tickets over the years. My thoughts:
> > > > > > >>>>>>>
> > > > > > >>>>>>> 1. An additional detail we can add to the motivation (or
> > > > possibly
> > > > > > >>>>> rejected
> > > > > > >>>>>>> alternatives) section is that this kind of custom retry
> > logic
> > > > > can't
> > > > > > >> be
> > > > > > >>>>>>> implemented by hand by, e.g., setting retries to 0 in the
> > > > > producer
> > > > > > >>>>> config
> > > > > > >>>>>>> and handling exceptions at the application level. Or
> > rather,
> > > it
> > > > > > can,
> > > > > > >>>>> but 1)
> > > > > > >>>>>>> it's a bit painful to have to reimplement at every
> > call-site
> > > > for
> > > > > > >>>>>>> Producer::send (and any code that awaits the returned
> > Future)
> > > > and
> > > > > > 2)
> > > > > > >>>>> it's
> > > > > > >>>>>>> impossible to do this without losing idempotency on
> > retries.
> > > > > > >>>>>>>
> > > > > > >>>>>>> 2. That said, I wonder if a pluggable interface is really
> > the
> > > > > right
> > > > > > >>> call
> > > > > > >>>>>>> here. Documenting the interactions of a producer with
> > > > > > >>>>>>> a ClientExceptionHandler instance will be tricky, and
> > > > > implementing
> > > > > > >>> them
> > > > > > >>>>>>> will also be a fair amount of work. I believe that there
> > > needs
> > > > to
> > > > > > be
> > > > > > >>>>> some
> > > > > > >>>>>>> more granularity for how writes to non-existent topics
> (or
> > > > > really,
> > > > > > >>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the
> > > broker)
> > > > > are
> > > > > > >>>>> handled,
> > > > > > >>>>>>> but I'm torn between keeping it simple with maybe one or
> > two
> > > > new
> > > > > > >>>>> producer
> > > > > > >>>>>>> config properties, or a full-blown pluggable interface.
> If
> > > > there
> > > > > > are
> > > > > > >>>>> more
> > > > > > >>>>>>> cases that would benefit from a pluggable interface, it
> > would
> > > > be
> > > > > > >> nice
> > > > > > >>> to
> > > > > > >>>>>>> identify these and add them to the KIP to strengthen the
> > > > > > motivation.
> > > > > > >>>>> Right
> > > > > > >>>>>>> now, I'm not sure the two that are mentioned in the
> > > motivation
> > > > > are
> > > > > > >>>>>>> sufficient.
> > > > > > >>>>>>>
> > > > > > >>>>>>> 3. Alternatively, a possible compromise is for this KIP
> to
> > > > > > introduce
> > > > > > >>> new
> > > > > > >>>>>>> properties that dictate how to handle
> > unknown-topic-partition
> > > > and
> > > > > > >>>>>>> record-too-large errors, with the thinking that if we
> > > > introduce a
> > > > > > >>>>> pluggable
> > > > > > >>>>>>> interface later on, these properties will be recognized
> by
> > > the
> > > > > > >> default
> > > > > > >>>>>>> implementation of that interface but could be completely
> > > > ignored
> > > > > or
> > > > > > >>>>>>> replaced by alternative implementations.
> > > > > > >>>>>>>
> > > > > > >>>>>>> 4. (Nit) You can remove the "This page is meant as a
> > template
> > > > for
> > > > > > >>>>> writing a
> > > > > > >>>>>>> KIP..." part from the KIP. It's not a template anymore :)
> > > > > > >>>>>>>
> > > > > > >>>>>>> 5. If we do go the pluggable interface route, wouldn't we
> > > want
> > > > to
> > > > > > >> add
> > > > > > >>>>> the
> > > > > > >>>>>>> possibility for retry logic? The simplest version of this
> > > could
> > > > > be
> > > > > > >> to
> > > > > > >>>>> add a
> > > > > > >>>>>>> RETRY value to the ClientExceptionHandlerResponse enum.
> > > > > > >>>>>>>
> > > > > > >>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of
> > > > > "CONTINUE"
> > > > > > >> for
> > > > > > >>>>>>> the ClientExceptionHandlerResponse enum, since they cause
> > > > records
> > > > > > to
> > > > > > >>> be
> > > > > > >>>>>>> dropped.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Cheers,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Chris
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > > > > > >>>>>>> <jols...@confluent.io.invalid> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hey Alieh,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I echo what Omnia says, I'm not sure I understand the
> > > > > implications
> > > > > > >> of
> > > > > > >>>>> the
> > > > > > >>>>>>>> change and I think more detail is needed.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> This comment also confused me a bit:
> > > > > > >>>>>>>> * {@code ClientExceptionHandler} that continues the
> > > > transaction
> > > > > > >> even
> > > > > > >>>>> if a
> > > > > > >>>>>>>> record is too large.
> > > > > > >>>>>>>> * Otherwise, it makes the transaction to fail.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Relatedly, I've been working with some folks on a KIP
> for
> > > > > > >>> transactions
> > > > > > >>>>>>>> errors and how they are handled. Specifically for the
> > > > > > >>>>>>>> RecordTooLargeException (and a few other errors), we
> want
> > to
> > > > > give
> > > > > > a
> > > > > > >>> new
> > > > > > >>>>>>>> error category for this error that allows the
> application
> > to
> > > > > > choose
> > > > > > >>>>> how it
> > > > > > >>>>>>>> is handled. Maybe this KIP is something that you are
> > looking
> > > > > for?
> > > > > > >>> Stay
> > > > > > >>>>>>>> tuned :)
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Justine
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > > > > > >>> o.g.h.ibra...@gmail.com
> > > > > > >>>>>>
> > > > > > >>>>>>>> wrote:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Alieh,
> > > > > > >>>>>>>>> Thanks for the KIP! I have couple of comments
> > > > > > >>>>>>>>> - You mentioned in the KIP motivation,
> > > > > > >>>>>>>>>> Another example for which a production exception
> handler
> > > > could
> > > > > > be
> > > > > > >>>>>>>> useful
> > > > > > >>>>>>>>> is if a user tries to write into a non-existing topic,
> > > which
> > > > > > >>> returns a
> > > > > > >>>>>>>>> retryable error code; with infinite retries, the
> producer
> > > > would
> > > > > > >> hang
> > > > > > >>>>>>>>> retrying forever. A handler could help to break the
> > > infinite
> > > > > > retry
> > > > > > >>>>> loop.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> How the handler can differentiate between something
> that
> > is
> > > > > > >>> temporary
> > > > > > >>>>> and
> > > > > > >>>>>>>>> it should keep retrying and something permanent like
> > forgot
> > > > to
> > > > > > >>> create
> > > > > > >>>>> the
> > > > > > >>>>>>>>> topic? temporary here could be
> > > > > > >>>>>>>>> the producer get deployed before the topic creation
> > finish
> > > > > > >>> (specially
> > > > > > >>>>> if
> > > > > > >>>>>>>>> the topic creation is handled via IaC)
> > > > > > >>>>>>>>> temporary offline partitions
> > > > > > >>>>>>>>> leadership changing
> > > > > > >>>>>>>>>       Isn’t this putting the producer at risk of
> dropping
> > > > > records
> > > > > > >>>>>>>>> unintentionally?
> > > > > > >>>>>>>>> - Can you elaborate more on what is written in the
> > > > > compatibility
> > > > > > /
> > > > > > >>>>>>>>> migration plan section please by explaining in bit more
> > > > details
> > > > > > >> what
> > > > > > >>>>> is
> > > > > > >>>>>>>> the
> > > > > > >>>>>>>>> changing behaviour and how this will impact client who
> > are
> > > > > > >>> upgrading?
> > > > > > >>>>>>>>> - In the proposal changes can you elaborate in the KIP
> > > where
> > > > in
> > > > > > >> the
> > > > > > >>>>>>>>> producer lifecycle will ClientExceptionHandler and
> > > > > > >>>>>>>>> TransactionExceptionHandler get triggered, and how will
> > the
> > > > > > >> producer
> > > > > > >>>>>>>>> configure them to point to costumed implementation.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Thanks
> > > > > > >>>>>>>>> Omnia
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > > > > > >>> <asae...@confluent.io.INVALID
> > > > > > >>>>>>
> > > > > > >>>>>>>>> wrote:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Hi all,
> > > > > > >>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
> > > Producer.
> > > > > > >>>>>>>>>> <
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I look forward to your feedback!
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Cheers,
> > > > > > >>>>>>>>>> Alieh
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to