My concern with respect to it being fragile: the code that ensures the
error type is internal to the producer. Someone may see it and say, I want
to add such and such error. This looks like internal code, so I don't need
a KIP, and then they can change it to whatever they want thinking it is
within the typical kafka improvement protocol.

Relying on an internal change to enforce an external API is fragile in my
opinion. That's why I sort of agreed with Artem with enforcing the error in
the method signature -- part of the public API.

Chris's comments on requiring more information to handler again makes me
wonder if we are solving a problem of lack of information at the
application level with a more powerful solution than we need. (Ie, if we
had more information, could the application close and restart the
transaction rather than having to drop records) But I am happy to
compromise with a handler that we can agree is sufficiently controlled and
documented.

Justine

On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Alieh,
>
> Continuing prior discussions:
>
> 1) Regarding the "flexibility" discussion, my overarching point is that I
> don't see the point in allowing for this kind of pluggable logic without
> also covering more scenarios. Take example 2 in the KIP: if we're going to
> implement retries only on "important" topics when a topic partition isn't
> found, why wouldn't we also want to be able to do this for other errors?
> Again, taking authorization errors as an example, why wouldn't we want to
> be able to fail when we can't write to "important" topics because the
> producer principal lacks sufficient ACLs, and drop the record if the topic
> isn't "important"? In a security-conscious environment with
> runtime-dependent topic routing (which is a common feature of many source
> connectors, such as the Debezium connectors), this seems fairly likely.
>
> 2) As far as changing the shape of the API goes, I like Artem's idea of
> splitting out the interface based on specific exceptions. This may be a
> little laborious to expand in the future, but if we really want to
> limit the exceptions that we cover with the handler and move slowly and
> cautiously, then IMO it'd be reasonable to reflect that in the interface. I
> also acknowledge that there's no way to completely prevent people from
> shooting themselves in the foot by implementing the API incorrectly, but I
> think it's worth it to do what we can--including leveraging the Java
> language's type system--to help them, so IMO there's value to eliminating
> the implicit behavior of failing when a policy returns RETRY for a
> non-retriable error. This can take a variety of shapes and I'm not going to
> insist on anything specific, but I do want to again raise my concerns with
> the current proposal and request that we find something a little better.
>
> 3) Concerning the default implementation--actually, I meant what I wrote :)
> I don't want a "second" default, I want an implementation of this interface
> to be used as the default if no others are specified. The behavior of this
> default implementation would be identical to existing behavior (so there
> would be no backwards compatibility concerns like the ones raised by
> Matthias), but it would be possible to configure this default handler class
> to behave differently for a basic set of scenarios. This would mirror (pun
> intended) the approach we've taken with Mirror Maker 2 and its
> ReplicationPolicy interface [1]. There is a default implementation
> available [2] that recognizes a handful of basic configuration properties
> [3] for simple tweaks, but if users want, they can also implement their own
> replication policy for more fine-grained logic if those properties aren't
> flexible enough.
>
> More concretely, I'm imagining something like this for the producer
> exception handler:
>
> - Default implementation class
> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> - This class would recognize two properties:
>   - drop.invalid.large.records: Boolean property, defaults to false. If
> "false", then causes the handler to return FAIL whenever
> a RecordTooLargeException is encountered; if "true", then causes
> SWALLOW/SKIP/DROP to be returned instead
>   - unknown.topic.partition.retry.timeout.ms: Integer property, defaults
> to
> INT_MAX. Whenever an UnknownTopicOrPartitionException is encountered,
> causes the handler to return FAIL if that record has been pending for more
> than the retry timeout; otherwise, causes RETRY to be returned
>
> I think this is worth addressing now instead of later because it forces us
> to evaluate the usefulness of this interface and it addresses a
> long-standing issue not just with Kafka Connect, but with the Java producer
> in general. For reference, here are a few tickets I collected after briefly
> skimming our Jira showing that this is a real pain point for users:
> https://issues.apache.org/jira/browse/KAFKA-10340,
> https://issues.apache.org/jira/browse/KAFKA-12990,
> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is
> frequently reported with Kafka Connect, it applies to anyone who configures
> a producer to use a high retry timeout. I am aware of the max.block.ms
> property, but it's painful and IMO poor behavior to require users to reduce
> the value of this property just to handle the single scenario when trying
> to write to topics that don't exist, since it would also limit the retry
> timeout for other operations that are legitimately retriable.
>
> Raising new points:
>
> 5) I don't see the interplay between this handler and existing
> retry-related properties mentioned anywhere in the KIP. I'm assuming that
> properties like "retries", "max.block.ms", and "delivery.timeout.ms" would
> take precedence over the handler and once they are exhausted, the
> record/batch will fail no matter what? If so, it's probably worth briefly
> mentioning this (no more than a sentence or two) in the KIP, and if not,
> I'm curious what you have in mind.
>
> 6) I also wonder if the API provides enough information in its current
> form. Would it be possible to provide handlers with some way of tracking
> how long a record has been pending for (i.e., how long it's been since the
> record was provided as an argument to Producer::send)? One way to do this
> could be to add a method like `onNewRecord(ProducerRecord)` and
> allow/require the handler to do its own bookkeeping, probably with a
> matching `onRecordSuccess(ProducerRecord)` method so that the handler
> doesn't eat up an ever-increasing amount of memory trying to track records.
> An alternative could be to include information about the initial time the
> record was received by the producer and the number of retries that have
> been performed for it as parameters in the handle method(s), but I'm not
> sure how easy this would be to implement and it might clutter things up a
> bit too much.
>
> 7) A small request--can we add Closeable (or, if you prefer, AutoCloseable)
> as a superinterface for the handler interface?
>
> [1] -
>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> [2] -
>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> [3] -
>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> ,
>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
>
> Cheers,
>
> Chris
>
> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Very interesting discussion.
> >
> > Seems a central point is the question about "how generic" we approach
> > this, and some people think we need to be conservative and others think
> > we should try to be as generic as possible.
> >
> > Personally, I think following a limited scope for this KIP (by
> > explicitly saying we only cover RecordTooLarge and
> > UnknownTopicOrPartition) might be better. We have concrete tickets that
> > we address, while for other exception (like authorization) we don't know
> > if people want to handle it to begin with. Boiling the ocean might not
> > get us too far, and being somewhat pragmatic might help to move this KIP
> > forward. -- I also agree with Justin and Artem, that we want to be
> > careful anyway to not allow users to break stuff too easily.
> >
> > As the same time, I agree that we should setup this change in a forward
> > looking way, and thus having a single generic handler allows us to later
> > extend the handler more easily. This should also simplify follow up KIP
> > that might add new error cases (I actually mentioned one more to Alieh
> > already, but we both agreed that it might be best to exclude it from the
> > KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not
> > the end of the world.)
> >
> >
> >
> > @Chris:
> >
> > (2) This sounds fair to me, but not sure how "bad" it actually would be?
> > If the contract is clearly defined, it seems to be fine what the KIP
> > proposes, and given that such a handler is an expert API, and we can
> > provide "best practices" (cf my other comment below in [AL1]), being a
> > little bit pragmatic sound fine to me.
> >
> > Not sure if I understand Justin's argument on this question?
> >
> >
> > (3) About having a default impl or not. I am fine with adding one, even
> > if I am not sure why Connect could not just add its own one and plug it
> > in (and we would add corresponding configs for Connect, but not for the
> > producer itself)? For this case, we could also do this as a follow up
> > KIP, but happy to include it in this KIP to provide value to Connect
> > right away (even if the value might not come right away if we miss the
> > 3.8 deadline due to expanded KIP scope...) --  For KS, we would for sure
> > plugin our own impl, and lock down the config such that users cannot set
> > their own handler on the internal producer to begin with. Might be good
> > to elaborate why the producer should have a default? We might actually
> > want to add this to the KIP right away?
> >
> > The key for a default impl would be, to not change the current behavior,
> > and having no default seems to achieve this. For the two cases you
> > mentioned, it's unclear to me what default value on "upper bound on
> > retires" for UnkownTopicOrPartitionException we should set? Seems it
> > would need to be the same as `delivery.timeout.ms`? However, if users
> > have `delivery.timeout.ms` actually overwritten we would need to set
> > this config somewhat "dynamic"? Is this feasible? If we hard-code 2
> > minutes, it might not be backward compatible. I have the impression we
> > might introduce some undesired coupling? -- For the "record too large"
> > case, the config seems to be boolean and setting it to `false` by
> > default seems to provide backward compatibility.
> >
> >
> >
> > @Artem:
> >
> > [AL1] While I see the point, I would think having a different callback
> > for every exception might not really be elegant? In the end, the handler
> > is an very advanced feature anyway, and if it's implemented in a bad
> > way, well, it's a user error -- we cannot protect users from everything.
> > To me, a handler like this, is to some extend "business logic" and if a
> > user gets business logic wrong, it's hard to protect them. -- We would
> > of course provide best practice guidance in the JaveDocs, and explain
> > that a handler should have explicit `if` statements for stuff it want to
> > handle, and only a single default which return FAIL.
> >
> >
> > [AL2] Yes, but for KS we would retry at the application layer. Ie, the
> > TX is not completed, we clean up and setup out task from scratch, to
> > ensure the pending transaction is completed before we resume. If the TX
> > was indeed aborted, we would retry from older offset and thus just hit
> > the same error again and the loop begins again.
> >
> >
> > [AL2 cont.] Similar to AL1, I see such a handler to some extend as
> > business logic. If a user puts a bad filter condition in their KS app,
> > and drops messages, it nothing we can do about it, and this handler
> > IMHO, has a similar purpose. This is also the line of thinking I apply
> > to EOS, to address Justin's concern about "should we allow to drop for
> > EOS", and my answer is "yes", because it's more business logic than
> > actual error handling IMHO. And by default, we fail... So users opt-in
> > to add business logic to drop records. It's an application level
> > decision how to write the code.
> >
> >
> > [AL3] Maybe I misunderstand what you are saying, but to me, checking the
> > size of the record upfront is exactly what the KIP proposes? No?
> >
> >
> >
> > @Justin:
> >
> > > I saw the sample
> > > code -- is it just an if statement checking for the error before the
> > > handler is invoked? That seems a bit fragile.
> >
> > What do you mean by fragile? Not sure if I see your point.
> >
> >
> >
> >
> > -Matthias
> >
> > On 5/7/24 5:33 PM, Artem Livshits wrote:
> > > Hi Alieh,
> > >
> > > Thanks for the KIP.  The motivation talks about very specific cases,
> but
> > > the interface is generic.
> > >
> > > [AL1]
> > > If the interface evolves in the future I think we could have the
> > following
> > > confusion:
> > >
> > > 1. A user implemented SWALLOW action for both RecordTooLargeException
> and
> > > UnknownTopicOrPartitionException.  For simpicity they just return
> SWALLOW
> > > from the function, because it elegantly handles all known cases.
> > > 2. The interface has evolved to support a new exception.
> > > 3. The user has upgraded their Kafka client.
> > >
> > > Now a new kind of error gets dropped on the floor without user's
> > intention
> > > and it would be super hard to detect and debug.
> > >
> > > To avoid the confusion, I think we should use handlers for specific
> > > exceptions.  Then if a new exception is added it won't get silently
> > swalled
> > > because the user would need to add new functionality to handle it.
> > >
> > > I also have some higher level comments:
> > >
> > > [AL2]
> > >> it throws a TimeoutException, and the user can only blindly retry,
> which
> > > may result in an infinite retry loop
> > >
> > > If the TimeoutException happens during transactional processing
> (exactly
> > > once is the desired sematnics), then the client should not retry when
> it
> > > gets TimeoutException because without knowing the reason for
> > > TimeoutExceptions, the client cannot know whether the message got
> > actually
> > > produced or not and retrying the message may result in duplicatees.
> > >
> > >> The thrown TimeoutException "cuts" the connection to the underlying
> root
> > > cause of missing metadata
> > >
> > > Maybe we should fix the error handling and return the proper underlying
> > > message?  Then the application can properly handle the message based on
> > > preferences.
> > >
> > >  From the product perspective, it's not clear how safe it is to blindly
> > > ignore UnknownTopicOrPartitionException.  This could lead to situations
> > > when a simple typo could lead to massive data loss (part of the data
> > would
> > > effectively be produced to a "black hole" and the user may not notice
> it
> > > for a while).
> > >
> > > In which situations would you recommend the user to "black hole"
> messages
> > > in case of misconfiguration?
> > >
> > > [AL3]
> > >
> > >> If the custom handler decides on SWALLOW for RecordTooLargeException,
> > >
> > > Is it my understanding that this KIP proposes that functionality that
> > would
> > > only be able to SWALLOW RecordTooLargeException that happen because the
> > > producer cannot produce the record (if the broker rejects the batch,
> the
> > > error won't get to the handler, because we cannot know which other
> > records
> > > get ignored).  In this case, why not just check the locally configured
> > max
> > > record size upfront and not produce the recrord in the first place?
> > Maybe
> > > we can expose a validation function from the producer that could
> validate
> > > the records locally, so we don't need to produce the record in order to
> > > know that it's invalid.
> > >
> > > -Artem
> > >
> > > On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> > <jols...@confluent.io.invalid>
> > > wrote:
> > >
> > >> Alieh and Chris,
> > >>
> > >> Thanks for clarifying 1) but I saw the motivation. I guess I just
> didn't
> > >> understand how that would be ensured on the producer side. I saw the
> > sample
> > >> code -- is it just an if statement checking for the error before the
> > >> handler is invoked? That seems a bit fragile.
> > >>
> > >> Can you clarify what you mean by `since the code does not reach the KS
> > >> interface and breaks somewhere in producer.` If we surfaced this error
> > to
> > >> the application in a better way would that also be a solution to the
> > issue?
> > >>
> > >> Justine
> > >>
> > >> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> > <asae...@confluent.io.invalid>
> > >> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>>
> > >>> Thank you, Chris and Justine, for the feedback.
> > >>>
> > >>>
> > >>> @Chris
> > >>>
> > >>> 1) Flexibility: it has two meanings. The first meaning is the one you
> > >>> mentioned. We are going to cover more exceptions in the future, but
> as
> > >>> Justine mentioned, we must be very conservative about adding more
> > >>> exceptions. Additionally, flexibility mainly means that the user is
> > able
> > >> to
> > >>> develop their own code. As mentioned in the motivation section and
> the
> > >>> examples, sometimes the user decides on dropping a record based on
> the
> > >>> topic, for example.
> > >>>
> > >>>
> > >>> 2) Defining two separate methods for retriable and non-retriable
> > >>> exceptions: although the idea is brilliant, the user may still make a
> > >>> mistake by implementing the wrong method and see a non-expecting
> > >> behaviour.
> > >>> For example, he may implement handleRetriable() for
> > >> RecordTooLargeException
> > >>> and define SWALLOW for the exception, but in practice, he sees no
> > change
> > >> in
> > >>> default behaviour since he implemented the wrong method. I think we
> can
> > >>> never reduce the user’s mistakes to 0.
> > >>>
> > >>>
> > >>>
> > >>> 3) Default implementation for Handler: the default behaviour is
> already
> > >>> preserved with NO need of implementing any handler or setting the
> > >>> corresponding config parameter `custom.exception.handler`. What you
> > mean
> > >> is
> > >>> actually having a second default, which requires having both
> interface
> > >> and
> > >>> config parameters. About UnknownTopicOrPartitionException: the
> producer
> > >>> already offers the config parameter `max.block.ms` which determines
> > the
> > >>> duration of retrying. The main purpose of the user who needs the
> > handler
> > >> is
> > >>> to get the root cause of TimeoutException and handle it in the way he
> > >>> intends. The KIP explains the necessity of it for KS users.
> > >>>
> > >>>
> > >>> 4) Naming issue: By SWALLOW, we meant actually swallow the error,
> while
> > >>> SKIP means skip the record, I think. If it makes sense for more ppl,
> I
> > >> can
> > >>> change it to SKIP
> > >>>
> > >>>
> > >>> @Justine
> > >>>
> > >>> 1) was addressed by Chris.
> > >>>
> > >>> 2 and 3) The problem is exactly what you mentioned. Currently, there
> is
> > >> no
> > >>> way to handle these issues application-side. Even KS users who
> > implement
> > >> KS
> > >>> ProductionExceptionHandler are not able to handle the exceptions as
> > they
> > >>> intend since the code does not reach the KS interface and breaks
> > >> somewhere
> > >>> in producer.
> > >>>
> > >>> Cheers,
> > >>> Alieh
> > >>>
> > >>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> fearthecel...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Justine,
> > >>>>
> > >>>> The method signatures for the interface are indeed open-ended, but
> the
> > >>> KIP
> > >>>> states that its uses will be limited. See the motivation section:
> > >>>>
> > >>>>> We believe that the user should be able to develop custom exception
> > >>>> handlers for managing producer exceptions. On the other hand, this
> > will
> > >>> be
> > >>>> an expert-level API, and using that may result in strange behaviour
> in
> > >>> the
> > >>>> system, making it hard to find the root cause. Therefore, the custom
> > >>>> handler is currently limited to handling RecordTooLargeException and
> > >>>> UnknownTopicOrPartitionException.
> > >>>>
> > >>>> Cheers,
> > >>>>
> > >>>> Chris
> > >>>>
> > >>>>
> > >>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> > <jols...@confluent.io.invalid
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Alieh,
> > >>>>>
> > >>>>> I was out for KSB and then was also sick. :(
> > >>>>>
> > >>>>> To your point 1) Chris, I don't think it is limited to two specific
> > >>>>> scenarios, since the interface accepts a generic Exception e and
> can
> > >> be
> > >>>>> implemented to check if that e is an instanceof any exception. I
> > >> didn't
> > >>>> see
> > >>>>> anywhere that specific errors are enforced. I'm a bit concerned
> about
> > >>>> this
> > >>>>> actually. I'm concerned about the opened-endedness and the contract
> > >> we
> > >>>> have
> > >>>>> with transactions. We are allowing the client to make decisions
> that
> > >>> are
> > >>>>> somewhat invisible to the server. As an aside, can we build in log
> > >>>> messages
> > >>>>> when the handler decides to skip etc a message. I'm really
> concerned
> > >>>> about
> > >>>>> messages being silently dropped.
> > >>>>>
> > >>>>> I do think Chris's point 2) about retriable vs non retriable errors
> > >> is
> > >>>>> fair. I'm a bit concerned about skipping a unknown topic or
> partition
> > >>>>> exception too early, as there are cases where it can be transient.
> > >>>>>
> > >>>>> I'm still a little bit wary of allowing dropping records as part of
> > >> EOS
> > >>>>> generally as in many cases, these errors signify an issue with the
> > >>>> original
> > >>>>> data. I understand that streams and connect/mirror maker may have
> > >>> reasons
> > >>>>> they want to progress past these messages, but wondering if there
> is
> > >> a
> > >>>> way
> > >>>>> that can be done application-side. I'm willing to accept this sort
> of
> > >>>>> proposal if we can make it clear that this sort of thing is
> happening
> > >>> and
> > >>>>> we limit the blast radius for what we can do.
> > >>>>>
> > >>>>> Justine
> > >>>>>
> > >>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> <chr...@aiven.io.invalid
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Alieh,
> > >>>>>>
> > >>>>>> Sorry for the delay, I've been out sick. I still have some
> thoughts
> > >>>> that
> > >>>>>> I'd like to see addressed before voting.
> > >>>>>>
> > >>>>>> 1) If flexibility is the motivation for a pluggable interface, why
> > >>> are
> > >>>> we
> > >>>>>> only limiting the uses for this interface to two very specific
> > >>>> scenarios?
> > >>>>>> Why not also allow, e.g., authorization errors to be handled as
> > >> well
> > >>>>>> (allowing users to drop records destined for some off-limits
> > >> topics,
> > >>> or
> > >>>>>> retry for a limited duration in case there's a delay in the
> > >>> propagation
> > >>>>> of
> > >>>>>> ACL updates)? It'd be nice to see some analysis of other errors
> > >> that
> > >>>>> could
> > >>>>>> be handled with this new API, both to avoid the follow-up work of
> > >>>> another
> > >>>>>> KIP to address them in the future, and to make sure that we're not
> > >>>>> painting
> > >>>>>> ourselves into a corner with the current API in a way that would
> > >> make
> > >>>>>> future modifications difficult.
> > >>>>>>
> > >>>>>> 2) Something feels a bit off with how retriable vs. non-retriable
> > >>>> errors
> > >>>>>> are handled with the interface. Why not introduce two separate
> > >>> methods
> > >>>> to
> > >>>>>> handle each case separately? That way there's no ambiguity or
> > >>> implicit
> > >>>>>> behavior when, e.g., attempting to retry on a
> > >>> RecordTooLargeException.
> > >>>>> This
> > >>>>>> could be something like `NonRetriableResponse
> > >> handle(ProducerRecord,
> > >>>>>> Exception)` and `RetriableResponse handleRetriable(ProducerRecord,
> > >>>>>> Exception)`, though the exact names and shape can obviously be
> > >> toyed
> > >>>>> with a
> > >>>>>> bit.
> > >>>>>>
> > >>>>>> 3) Although the flexibility of a pluggable interface may benefit
> > >> some
> > >>>>>> users' custom producer applications and Kafka Streams
> applications,
> > >>> it
> > >>>>>> comes at significant deployment cost for other low-/no-code
> > >>>> environments,
> > >>>>>> including but not limited to Kafka Connect and MirrorMaker 2. Can
> > >> we
> > >>>> add
> > >>>>> a
> > >>>>>> default implementation of the exception handler that allows for
> > >> some
> > >>>>> simple
> > >>>>>> behavior to be tweaked via configuration property? Two things that
> > >>>> would
> > >>>>> be
> > >>>>>> nice to have would be A) an upper bound on the retry time for
> > >>>>>> unknown-topic-partition exceptions and B) an option to drop
> records
> > >>>> that
> > >>>>>> are large enough to trigger a record-too-large exception.
> > >>>>>>
> > >>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the
> proposed
> > >>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, especially
> > >>> when
> > >>>>>> trying to guess the behavior for retriable errors.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>>
> > >>>>>> Chris
> > >>>>>>
> > >>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > >>>>> <asae...@confluent.io.invalid
> > >>>>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi all,
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> A summary of the KIP and the discussions:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> The KIP introduces a handler interface for Producer in order to
> > >>>> handle
> > >>>>>> two
> > >>>>>>> exceptions: RecordTooLargeException and
> > >>>>> UnknownTopicOrPartitionException.
> > >>>>>>> The handler handles the exceptions per-record.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - Do we need this handler?  [Motivation and Examples sections]
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> RecordTooLargeException: 1) In transactions, the producer
> > >> collects
> > >>>>>> multiple
> > >>>>>>> records in batches. Then a RecordTooLargeException related to a
> > >>>> single
> > >>>>>>> record leads to failing the entire batch. A custom exception
> > >>> handler
> > >>>> in
> > >>>>>>> this case may decide on dropping the record and continuing the
> > >>>>>> processing.
> > >>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a record
> > >>> that
> > >>>> is
> > >>>>>> too
> > >>>>>>> large is a poison pill record, and there is no way to skip over
> > >>> it. A
> > >>>>>>> handler would allow us to react to this error inside the
> > >> producer,
> > >>>>> i.e.,
> > >>>>>>> local to where the error happens, and thus simplify the overall
> > >>> code
> > >>>>>>> significantly. Please read the Motivation section for more
> > >>>> explanation.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> UnknownTopicOrPartitionException: For this case, the producer
> > >>> handles
> > >>>>>> this
> > >>>>>>> exception internally and only issues a WARN log about missing
> > >>>> metadata
> > >>>>>> and
> > >>>>>>> retries internally. Later, when the producer hits "
> > >>>> deliver.timeout.ms"
> > >>>>>> it
> > >>>>>>> throws a TimeoutException, and the user can only blindly retry,
> > >>> which
> > >>>>> may
> > >>>>>>> result in an infinite retry loop. The thrown TimeoutException
> > >>> "cuts"
> > >>>>> the
> > >>>>>>> connection to the underlying root cause of missing metadata
> > >> (which
> > >>>>> could
> > >>>>>>> indeed be a transient error but is persistent for a non-existing
> > >>>>> topic).
> > >>>>>>> Thus, there is no programmatic way to break the infinite retry
> > >>> loop.
> > >>>>>> Kafka
> > >>>>>>> Streams also blindly retries for this case, and the application
> > >>> gets
> > >>>>>> stuck.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - Having interface vs configuration option: [Motivation,
> > >> Examples,
> > >>>> and
> > >>>>>>> Rejected Alternatives sections]
> > >>>>>>>
> > >>>>>>> Our solution is introducing an interface due to the full
> > >>> flexibility
> > >>>>> that
> > >>>>>>> it offers. Sometimes users, especially Kafka Streams ones,
> > >>> determine
> > >>>>> the
> > >>>>>>> handler's behaviour based on the situation. For example, f
> > >>>>>>> acing UnknownTopicOrPartitionException*, *the user may want to
> > >>> raise
> > >>>> an
> > >>>>>>> error for some topics but retry it for other topics. Having a
> > >>>>>> configuration
> > >>>>>>> option with a fixed set of possibilities does not serve the
> > >> user's
> > >>>>>>> needs. See Example 2, please.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - Note on RecordTooLargeException: [Public Interfaces section]
> > >>>>>>>
> > >>>>>>> If the custom handler decides on SWALLOW for
> > >>> RecordTooLargeException,
> > >>>>>> then
> > >>>>>>> this record will not be a part of the batch of transactions and
> > >>> will
> > >>>>> also
> > >>>>>>> not be sent to the broker in non-transactional mode. So no
> > >> worries
> > >>>>> about
> > >>>>>>> getting a RecordTooLargeException from the broker in this case,
> > >> as
> > >>>> the
> > >>>>>>> record will never ever be sent to the broker. SWALLOW means drop
> > >>> the
> > >>>>>> record
> > >>>>>>> and continue/swallow the error.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - What if the handle() method implements RETRY for
> > >>>>>> RecordTooLargeException?
> > >>>>>>> [Proposed Changes section]
> > >>>>>>>
> > >>>>>>> We have to limit the user to only have FAIL or SWALLOW for
> > >>>>>>> RecordTooLargeException. Actually, RETRY must be equal to FAIL.
> > >>> This
> > >>>> is
> > >>>>>>> well documented/informed in javadoc.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - What if the handle() method of the handler throws an exception?
> > >>>>>>>
> > >>>>>>> The handler is expected to have correct code. If it throws an
> > >>>>> exception,
> > >>>>>>> everything fails.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> This is a PoC PR <https://github.com/apache/kafka/pull/15846>
> > >> ONLY
> > >>>> for
> > >>>>>>> RecordTooLargeException. The code changes related to
> > >>>>>>> UnknownTopicOrPartitionException will be added to this PR LATER.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Looking forward to your feedback again.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>>
> > >>>>>>> Alieh
> > >>>>>>>
> > >>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Alieh,
> > >>>>>>>>
> > >>>>>>>> Thanks for the updates!
> > >>>>>>>>
> > >>>>>>>> Comments inline...
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > >>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>
> > >>>>>>>>> Thanks a lot for the constructive feedbacks!
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Addressing some of the main concerns:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> - The `RecordTooLargeException` can be thrown by broker,
> > >>> producer
> > >>>>> and
> > >>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` interface
> > >>> is
> > >>>>>>>> introduced
> > >>>>>>>>> to affect only the exceptions thrown from the producer. This
> > >>> KIP
> > >>>>> very
> > >>>>>>>>> specifically means to provide a possibility to manage the
> > >>>>>>>>> `RecordTooLargeException` thrown from the Producer.send()
> > >>> method.
> > >>>>>>> Please
> > >>>>>>>>> see “Proposed Changes” section for more clarity. I
> > >> investigated
> > >>>> the
> > >>>>>>> issue
> > >>>>>>>>> there thoroughly. I hope it can explain the concern about how
> > >>> we
> > >>>>>> handle
> > >>>>>>>> the
> > >>>>>>>>> errors as well.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> - The problem with Callback: Methods of Callback are called
> > >>> when
> > >>>>> the
> > >>>>>>>> record
> > >>>>>>>>> sent to the server is acknowledged, while this is not the
> > >>> desired
> > >>>>>> time
> > >>>>>>>> for
> > >>>>>>>>> all exceptions. We intend to handle exceptions beforehand.
> > >>>>>>>>
> > >>>>>>>> I guess it makes sense to keep the expectation for when
> > >> Callback
> > >>> is
> > >>>>>>>> invoked as-is vs. shoehorning more into it.
> > >>>>>>>>
> > >>>>>>>>> - What if the custom handler returns RETRY for
> > >>>>>>>> `RecordTooLargeException`? I
> > >>>>>>>>> assume changing the producer configuration at runtime is
> > >>>> possible.
> > >>>>> If
> > >>>>>>> so,
> > >>>>>>>>> RETRY for a too large record is valid because maybe in the
> > >> next
> > >>>>> try,
> > >>>>>>> the
> > >>>>>>>>> too large record is not poisoning any more. I am not 100%
> > >> sure
> > >>>>> about
> > >>>>>>> the
> > >>>>>>>>> technical details, though. Otherwise, we can consider the
> > >> RETRY
> > >>>> as
> > >>>>>> FAIL
> > >>>>>>>> for
> > >>>>>>>>> this exception. Another solution would be to consider a
> > >>> constant
> > >>>>>> number
> > >>>>>>>> of
> > >>>>>>>>> times for RETRY which can be useful for other exceptions as
> > >>> well.
> > >>>>>>>>
> > >>>>>>>> It’s not presently possible to change the configuration of an
> > >>>>> existing
> > >>>>>>>> Producer at runtime. So if a record hits a
> > >>> RecordTooLargeException
> > >>>>>> once,
> > >>>>>>> no
> > >>>>>>>> amount of retrying (with the current Producer) will change that
> > >>>> fact.
> > >>>>>> So
> > >>>>>>>> I’m still a little stuck on how to handle a response of RETRY
> > >> for
> > >>>> an
> > >>>>>>>> “oversized” record.
> > >>>>>>>>
> > >>>>>>>>> - What if the handle() method itself throws an exception? I
> > >>> think
> > >>>>>>>>> rationally and pragmatically, the behaviour must be exactly
> > >>> like
> > >>>>> when
> > >>>>>>> no
> > >>>>>>>>> custom handler is defined since the user actually did not
> > >> have
> > >>> a
> > >>>>>>> working
> > >>>>>>>>> handler.
> > >>>>>>>>
> > >>>>>>>> I’m not convinced that ignoring an errant handler is the right
> > >>>>> choice.
> > >>>>>> It
> > >>>>>>>> then becomes a silent failure that might have repercussions,
> > >>>>> depending
> > >>>>>> on
> > >>>>>>>> the business logic. A user would have to proactively trawls
> > >>> through
> > >>>>> the
> > >>>>>>>> logs for WARN/ERROR messages to catch it.
> > >>>>>>>>
> > >>>>>>>> Throwing a hard error is pretty draconian, though…
> > >>>>>>>>
> > >>>>>>>>> - Why not use config parameters instead of an interface? As
> > >>>>> explained
> > >>>>>>> in
> > >>>>>>>>> the “Rejected Alternatives” section, we assume that the
> > >> handler
> > >>>>> will
> > >>>>>> be
> > >>>>>>>>> used for a greater number of exceptions in the future.
> > >>> Defining a
> > >>>>>>>>> configuration parameter for each exception may make the
> > >>>>>> configuration a
> > >>>>>>>> bit
> > >>>>>>>>> messy. Moreover, the handler offers more flexibility.
> > >>>>>>>>
> > >>>>>>>> Agreed that the logic-via-configuration approach is weird and
> > >>>>> limiting.
> > >>>>>>>> Forget I ever suggested it ;)
> > >>>>>>>>
> > >>>>>>>> I’d think additional background in the Motivation section would
> > >>>> help
> > >>>>> me
> > >>>>>>>> understand how users might use this feature beyond a) skipping
> > >>>>>>> “oversized”
> > >>>>>>>> records, and b) not retrying missing topics.
> > >>>>>>>>
> > >>>>>>>>> Small change:
> > >>>>>>>>>
> > >>>>>>>>> -ProductionExceptionHandlerResponse -> Response for brevity
> > >> and
> > >>>>>>>> simplicity.
> > >>>>>>>>> Could’ve been HandlerResponse too I think!
> > >>>>>>>>
> > >>>>>>>> The name change sounds good to me.
> > >>>>>>>>
> > >>>>>>>> Thanks Alieh!
> > >>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> I thank you all again for your useful questions/suggestions.
> > >>>>>>>>>
> > >>>>>>>>> I would be happy to hear more of your concerns, as stated in
> > >>> some
> > >>>>>>>> feedback.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Alieh
> > >>>>>>>>>
> > >>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > >>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks Alieh for the updates.
> > >>>>>>>>>>
> > >>>>>>>>>> I'm a little concerned about the design pattern here. It
> > >> seems
> > >>>>> like
> > >>>>>> we
> > >>>>>>>> want
> > >>>>>>>>>> specific usages, but we are packaging it as a generic
> > >> handler.
> > >>>>>>>>>> I think we tried to narrow down on the specific errors we
> > >> want
> > >>>> to
> > >>>>>>>> handle,
> > >>>>>>>>>> but it feels a little clunky as we have a generic thing for
> > >>> two
> > >>>>>>> specific
> > >>>>>>>>>> errors.
> > >>>>>>>>>>
> > >>>>>>>>>> I'm wondering if we are using the right patterns to solve
> > >>> these
> > >>>>>>>> problems. I
> > >>>>>>>>>> agree though that we will need something more than the error
> > >>>>> classes
> > >>>>>>> I'm
> > >>>>>>>>>> proposing if we want to have different handling be
> > >>> configurable.
> > >>>>>>>>>> My concern is that the open-endedness of a handler means
> > >> that
> > >>> we
> > >>>>> are
> > >>>>>>>>>> creating more problems than we are solving. It is still
> > >>> unclear
> > >>>> to
> > >>>>>> me
> > >>>>>>>> how
> > >>>>>>>>>> we expect to handle the errors. Perhaps we could include an
> > >>>>> example?
> > >>>>>>> It
> > >>>>>>>>>> seems like there is a specific use case in mind and maybe we
> > >>> can
> > >>>>>> make
> > >>>>>>> a
> > >>>>>>>>>> design that is tighter and supports that case.
> > >>>>>>>>>>
> > >>>>>>>>>> Justine
> > >>>>>>>>>>
> > >>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <
> > >> k...@kirktrue.pro>
> > >>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the KIP!
> > >>>>>>>>>>>
> > >>>>>>>>>>> A few questions:
> > >>>>>>>>>>>
> > >>>>>>>>>>> K1. What is the expected behavior for the producer if it
> > >>>>> generates
> > >>>>>> a
> > >>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY?
> > >>>>>>>>>>> K2. How do we determine which Record was responsible for
> > >> the
> > >>>>>>>>>>> UnknownTopicOrPartitionException since we get that response
> > >>>> when
> > >>>>>>>>>> sending  a
> > >>>>>>>>>>> batch of records?
> > >>>>>>>>>>> K3. What is the expected behavior if the handle() method
> > >>> itself
> > >>>>>>> throws
> > >>>>>>>> an
> > >>>>>>>>>>> error?
> > >>>>>>>>>>> K4. What is the downside of adding an onError() method to
> > >> the
> > >>>>>>>> Producer’s
> > >>>>>>>>>>> Callback interface vs. a new mechanism?
> > >>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to
> > >> just
> > >>>>>>> “Response”
> > >>>>>>>>>>> given that it’s an inner enum?
> > >>>>>>>>>>> K6. Any recommendation for callback authors to handle
> > >>> different
> > >>>>>>>> behavior
> > >>>>>>>>>>> for different topics?
> > >>>>>>>>>>>
> > >>>>>>>>>>> I’ll echo what others have said, it would help me
> > >> understand
> > >>>> why
> > >>>>> we
> > >>>>>>>> want
> > >>>>>>>>>>> another handler class if there were more examples in the
> > >>>>> Motivation
> > >>>>>>>>>>> section. As it stands now, I agree with Chris that the
> > >> stated
> > >>>>>> issues
> > >>>>>>>>>> could
> > >>>>>>>>>>> be solved by adding two new configuration options:
> > >>>>>>>>>>>
> > >>>>>>>>>>>     oversized.record.behavior=fail
> > >>>>>>>>>>>     retry.on.unknown.topic.or.partition=true
> > >>>>>>>>>>>
> > >>>>>>>>>>> What I’m not yet able to wrap my head around is: what
> > >> exactly
> > >>>>> would
> > >>>>>>> the
> > >>>>>>>>>>> logic in the handler be? I’m not very imaginative, so I’m
> > >>>>> assuming
> > >>>>>>>> they’d
> > >>>>>>>>>>> mostly be if-this-then-that. However, if they’re more
> > >>>>> complicated,
> > >>>>>>> I’d
> > >>>>>>>>>> have
> > >>>>>>>>>>> other concerns.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks,
> > >>>>>>>>>>> Kirk
> > >>>>>>>>>>>
> > >>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> > >>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thank you all for the feedback!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Addressing the main concern: The KIP is about giving the
> > >>> user
> > >>>>> the
> > >>>>>>>>>> ability
> > >>>>>>>>>>>> to handle producer exceptions, but to be more conservative
> > >>> and
> > >>>>>> avoid
> > >>>>>>>>>>> future
> > >>>>>>>>>>>> issues, we decided to be limited to a short list of
> > >>>> exceptions.
> > >>>>> I
> > >>>>>>>>>>> included
> > >>>>>>>>>>>> *RecordTooLargeExceptin* and
> > >>>> *UnknownTopicOrPartitionException.
> > >>>>>>> *Open
> > >>>>>>>>>> to
> > >>>>>>>>>>>> suggestion for adding some more ;-)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> KIP Updates:
> > >>>>>>>>>>>> - clarified the way that the user should configure the
> > >>>> Producer
> > >>>>> to
> > >>>>>>> use
> > >>>>>>>>>>> the
> > >>>>>>>>>>>> custom handler. I think adding a producer config property
> > >> is
> > >>>> the
> > >>>>>>>>>> cleanest
> > >>>>>>>>>>>> one.
> > >>>>>>>>>>>> - changed the *ClientExceptionHandler* to
> > >>>>>> *ProducerExceptionHandler*
> > >>>>>>>> to
> > >>>>>>>>>>> be
> > >>>>>>>>>>>> closer to what we are changing.
> > >>>>>>>>>>>> - added the ProducerRecord as the input parameter of the
> > >>>>> handle()
> > >>>>>>>>>> method
> > >>>>>>>>>>> as
> > >>>>>>>>>>>> well.
> > >>>>>>>>>>>> - increased the response types to 3 to have fail and two
> > >>> types
> > >>>>> of
> > >>>>>>>>>>> continue.
> > >>>>>>>>>>>> - The default behaviour is having no custom handler,
> > >> having
> > >>>> the
> > >>>>>>>>>>>> corresponding config parameter set to null. Therefore, the
> > >>> KIP
> > >>>>>>>> provides
> > >>>>>>>>>>> no
> > >>>>>>>>>>>> default implementation of the interface.
> > >>>>>>>>>>>> - We follow the interface solution as described in the
> > >>>>>>>>>>>> Rejected Alternetives section.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
> > >>>>> mj...@apache.org
> > >>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important case
> > >>> for
> > >>>>>> error
> > >>>>>>>>>>>>> handling.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I agree that using this handler would be an expert API,
> > >> as
> > >>>>>>> mentioned
> > >>>>>>>>>> by
> > >>>>>>>>>>>>> a few people. But I don't think it would be a reason to
> > >> not
> > >>>> add
> > >>>>>> it.
> > >>>>>>>>>> It's
> > >>>>>>>>>>>>> always a tricky tradeoff what to expose to users and to
> > >>> avoid
> > >>>>>> foot
> > >>>>>>>>>> guns,
> > >>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and have
> > >>> good
> > >>>>>>>>>> experience
> > >>>>>>>>>>>>> with it. Hence, I understand, but don't share the concern
> > >>>>> raised.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I also agree that there is some responsibility by the
> > >> user
> > >>> to
> > >>>>>>>>>> understand
> > >>>>>>>>>>>>> how such a handler should be implemented to not drop data
> > >>> by
> > >>>>>>>> accident.
> > >>>>>>>>>>>>> But it seem unavoidable and acceptable.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg via
> > >>>>>> configs)
> > >>>>>>>>>> might
> > >>>>>>>>>>>>> also work, I personally prefer a full handler. Configs
> > >> have
> > >>>> the
> > >>>>>>> same
> > >>>>>>>>>>>>> issue that they could be miss-used potentially leading to
> > >>>>>>> incorrectly
> > >>>>>>>>>>>>> dropped data, but at the same time are less flexible (and
> > >>>> thus
> > >>>>>>> maybe
> > >>>>>>>>>>>>> ever harder to use correctly...?). Base on my experience,
> > >>>> there
> > >>>>>> is
> > >>>>>>>>>> also
> > >>>>>>>>>>>>> often weird corner case for which it make sense to also
> > >>> drop
> > >>>>>>> records
> > >>>>>>>>>> for
> > >>>>>>>>>>>>> other exceptions, and a full handler has the advantage of
> > >>>> full
> > >>>>>>>>>>>>> flexibility and "absolute power!".
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> To be fair: I don't know the exact code paths of the
> > >>> producer
> > >>>>> in
> > >>>>>>>>>>>>> details, so please keep me honest. But my understanding
> > >> is,
> > >>>>> that
> > >>>>>>> the
> > >>>>>>>>>> KIP
> > >>>>>>>>>>>>> aims to allow users to react to internal exception, and
> > >>>> decide
> > >>>>> to
> > >>>>>>>> keep
> > >>>>>>>>>>>>> retrying internally, swallow the error and drop the
> > >> record,
> > >>>> or
> > >>>>>>> raise
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> error?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Maybe the KIP would need to be a little bit more precises
> > >>>> what
> > >>>>>>> error
> > >>>>>>>>>> we
> > >>>>>>>>>>>>> want to cover -- I don't think this list must be
> > >>> exhaustive,
> > >>>> as
> > >>>>>> we
> > >>>>>>>> can
> > >>>>>>>>>>>>> always do follow up KIP to also apply the handler to
> > >> other
> > >>>>> errors
> > >>>>>>> to
> > >>>>>>>>>>>>> expand the scope of the handler. The KIP does mention
> > >>>> examples,
> > >>>>>> but
> > >>>>>>>> it
> > >>>>>>>>>>>>> might be good to explicitly state for what cases the
> > >>> handler
> > >>>>> gets
> > >>>>>>>>>>> applied?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough
> > >> options?
> > >>>>> Don't
> > >>>>>>> we
> > >>>>>>>>>>>>> need three options? Or would `CONTINUE` have different
> > >>>> meaning
> > >>>>>>>>>> depending
> > >>>>>>>>>>>>> on the type of error? Ie, for a retryable error
> > >> `CONTINUE`
> > >>>>> would
> > >>>>>>> mean
> > >>>>>>>>>>>>> keep retrying internally, but for a non-retryable error
> > >>>>>> `CONTINUE`
> > >>>>>>>>>> means
> > >>>>>>>>>>>>> swallow the error and drop the record? This semantic
> > >>> overload
> > >>>>>> seems
> > >>>>>>>>>>>>> tricky to reason about by users, so it might better to
> > >>> split
> > >>>>>>>>>> `CONTINUE`
> > >>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better
> > >>>> names).
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Additionally, should we just ship a
> > >>>>>> `DefaultClientExceptionHandler`
> > >>>>>>>>>>>>> which would return `FAIL`, for backward compatibility. Or
> > >>>> don't
> > >>>>>>> have
> > >>>>>>>>>> any
> > >>>>>>>>>>>>> default handler to begin with and allow it to be `null`?
> > >> I
> > >>>>> don't
> > >>>>>>> see
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To me,
> > >>> the
> > >>>>>> goal
> > >>>>>>>>>>>>> should be to not modify the default behavior at all, but
> > >> to
> > >>>>> just
> > >>>>>>>> allow
> > >>>>>>>>>>>>> users to change the default behavior if there is a need.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What is missing on the KIP though it, how the handler is
> > >>>> passed
> > >>>>>>> into
> > >>>>>>>>>> the
> > >>>>>>>>>>>>> producer thought? Would we need a new config which allows
> > >>> to
> > >>>>> set
> > >>>>>> a
> > >>>>>>>>>>>>> custom handler? And/or would we allow to pass in an
> > >>> instance
> > >>>>> via
> > >>>>>>> the
> > >>>>>>>>>>>>> constructor or add a new method to set a handler?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> > >>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer is
> > >>>>> really
> > >>>>>>> not
> > >>>>>>>>>>>>> ideal.
> > >>>>>>>>>>>>>> It’s even harder working out what’s going on with the
> > >>>>> consumer.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with Chris
> > >>> that
> > >>>>> it
> > >>>>>>>> could
> > >>>>>>>>>>> do
> > >>>>>>>>>>>>> with additional
> > >>>>>>>>>>>>>> motivation. This would be an expert-level interface
> > >> given
> > >>>> how
> > >>>>>>>>>>> complicated
> > >>>>>>>>>>>>>> the exception handling for Kafka has become.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 7. The application is not really aware of the batching
> > >>> being
> > >>>>>> done
> > >>>>>>> on
> > >>>>>>>>>>> its
> > >>>>>>>>>>>>> behalf.
> > >>>>>>>>>>>>>> The ProduceResponse can actually return an array of
> > >>> records
> > >>>>>> which
> > >>>>>>>>>>> failed
> > >>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and want
> > >> to
> > >>>>>> retry,
> > >>>>>>>> you
> > >>>>>>>>>>>>> probably
> > >>>>>>>>>>>>>> need to remove the offending records from the batch and
> > >>>> retry
> > >>>>>> it.
> > >>>>>>>>>> This
> > >>>>>>>>>>>>> is getting fiddly.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. I
> > >>>> wonder
> > >>>>>>>> whether
> > >>>>>>>>>>> an
> > >>>>>>>>>>>>>> alternative might be to add a method to the existing
> > >>>> Callback
> > >>>>>>>>>>> interface,
> > >>>>>>>>>>>>> such as:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>   ClientExceptionResponse onException(Exception
> > >> exception)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> It would be called when a ProduceResponse contains an
> > >>> error,
> > >>>>> but
> > >>>>>>> the
> > >>>>>>>>>>>>>> producer is going to retry. It tells the producer
> > >> whether
> > >>> to
> > >>>>> go
> > >>>>>>>> ahead
> > >>>>>>>>>>>>> with the retry
> > >>>>>>>>>>>>>> or not. The default implementation would be to CONTINUE,
> > >>>>> because
> > >>>>>>>>>> that’s
> > >>>>>>>>>>>>>> just continuing to retry as planned. Note that this is a
> > >>>>>>> per-record
> > >>>>>>>>>>>>> callback, so
> > >>>>>>>>>>>>>> the application would be able to understand which
> > >> records
> > >>>>>> failed.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> By using an existing interface, we already know how to
> > >>>>> configure
> > >>>>>>> it
> > >>>>>>>>>> and
> > >>>>>>>>>>>>> we know
> > >>>>>>>>>>>>>> about the threading model for calling it.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>> Andrew
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
> > >>>>>> <chr...@aiven.io.INVALID
> > >>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to
> > >>> non-existent
> > >>>>>> topics
> > >>>>>>>> is
> > >>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect and
> > >>> has
> > >>>>>> been
> > >>>>>>>> the
> > >>>>>>>>>>>>> source
> > >>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My
> > >> thoughts:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation
> > >> (or
> > >>>>>> possibly
> > >>>>>>>>>>>>> rejected
> > >>>>>>>>>>>>>>> alternatives) section is that this kind of custom retry
> > >>>> logic
> > >>>>>>> can't
> > >>>>>>>>>> be
> > >>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 in
> > >> the
> > >>>>>>> producer
> > >>>>>>>>>>>>> config
> > >>>>>>>>>>>>>>> and handling exceptions at the application level. Or
> > >>>> rather,
> > >>>>> it
> > >>>>>>>> can,
> > >>>>>>>>>>>>> but 1)
> > >>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every
> > >>>> call-site
> > >>>>>> for
> > >>>>>>>>>>>>>>> Producer::send (and any code that awaits the returned
> > >>>> Future)
> > >>>>>> and
> > >>>>>>>> 2)
> > >>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>> impossible to do this without losing idempotency on
> > >>>> retries.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is
> > >> really
> > >>>> the
> > >>>>>>> right
> > >>>>>>>>>>> call
> > >>>>>>>>>>>>>>> here. Documenting the interactions of a producer with
> > >>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, and
> > >>>>>>> implementing
> > >>>>>>>>>>> them
> > >>>>>>>>>>>>>>> will also be a fair amount of work. I believe that
> > >> there
> > >>>>> needs
> > >>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>>>>> some
> > >>>>>>>>>>>>>>> more granularity for how writes to non-existent topics
> > >>> (or
> > >>>>>>> really,
> > >>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the
> > >>>>> broker)
> > >>>>>>> are
> > >>>>>>>>>>>>> handled,
> > >>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe one
> > >> or
> > >>>> two
> > >>>>>> new
> > >>>>>>>>>>>>> producer
> > >>>>>>>>>>>>>>> config properties, or a full-blown pluggable interface.
> > >>> If
> > >>>>>> there
> > >>>>>>>> are
> > >>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>> cases that would benefit from a pluggable interface, it
> > >>>> would
> > >>>>>> be
> > >>>>>>>>>> nice
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen
> > >> the
> > >>>>>>>> motivation.
> > >>>>>>>>>>>>> Right
> > >>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the
> > >>>>> motivation
> > >>>>>>> are
> > >>>>>>>>>>>>>>> sufficient.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this KIP
> > >>> to
> > >>>>>>>> introduce
> > >>>>>>>>>>> new
> > >>>>>>>>>>>>>>> properties that dictate how to handle
> > >>>> unknown-topic-partition
> > >>>>>> and
> > >>>>>>>>>>>>>>> record-too-large errors, with the thinking that if we
> > >>>>>> introduce a
> > >>>>>>>>>>>>> pluggable
> > >>>>>>>>>>>>>>> interface later on, these properties will be recognized
> > >>> by
> > >>>>> the
> > >>>>>>>>>> default
> > >>>>>>>>>>>>>>> implementation of that interface but could be
> > >> completely
> > >>>>>> ignored
> > >>>>>>> or
> > >>>>>>>>>>>>>>> replaced by alternative implementations.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a
> > >>>> template
> > >>>>>> for
> > >>>>>>>>>>>>> writing a
> > >>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template anymore
> > >> :)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 5. If we do go the pluggable interface route, wouldn't
> > >> we
> > >>>>> want
> > >>>>>> to
> > >>>>>>>>>> add
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> possibility for retry logic? The simplest version of
> > >> this
> > >>>>> could
> > >>>>>>> be
> > >>>>>>>>>> to
> > >>>>>>>>>>>>> add a
> > >>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse enum.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of
> > >>>>>>> "CONTINUE"
> > >>>>>>>>>> for
> > >>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they
> > >> cause
> > >>>>>> records
> > >>>>>>>> to
> > >>>>>>>>>>> be
> > >>>>>>>>>>>>>>> dropped.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> > >>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hey Alieh,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand the
> > >>>>>>> implications
> > >>>>>>>>>> of
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> change and I think more detail is needed.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> This comment also confused me a bit:
> > >>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the
> > >>>>>> transaction
> > >>>>>>>>>> even
> > >>>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>>> record is too large.
> > >>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a KIP
> > >>> for
> > >>>>>>>>>>> transactions
> > >>>>>>>>>>>>>>>> errors and how they are handled. Specifically for the
> > >>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), we
> > >>> want
> > >>>> to
> > >>>>>>> give
> > >>>>>>>> a
> > >>>>>>>>>>> new
> > >>>>>>>>>>>>>>>> error category for this error that allows the
> > >>> application
> > >>>> to
> > >>>>>>>> choose
> > >>>>>>>>>>>>> how it
> > >>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you are
> > >>>> looking
> > >>>>>>> for?
> > >>>>>>>>>>> Stay
> > >>>>>>>>>>>>>>>> tuned :)
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> > >>>>>>>>>>> o.g.h.ibra...@gmail.com
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments
> > >>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation,
> > >>>>>>>>>>>>>>>>>> Another example for which a production exception
> > >>> handler
> > >>>>>> could
> > >>>>>>>> be
> > >>>>>>>>>>>>>>>> useful
> > >>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing
> > >> topic,
> > >>>>> which
> > >>>>>>>>>>> returns a
> > >>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the
> > >>> producer
> > >>>>>> would
> > >>>>>>>>>> hang
> > >>>>>>>>>>>>>>>>> retrying forever. A handler could help to break the
> > >>>>> infinite
> > >>>>>>>> retry
> > >>>>>>>>>>>>> loop.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> How the handler can differentiate between something
> > >>> that
> > >>>> is
> > >>>>>>>>>>> temporary
> > >>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> it should keep retrying and something permanent like
> > >>>> forgot
> > >>>>>> to
> > >>>>>>>>>>> create
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> topic? temporary here could be
> > >>>>>>>>>>>>>>>>> the producer get deployed before the topic creation
> > >>>> finish
> > >>>>>>>>>>> (specially
> > >>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>> the topic creation is handled via IaC)
> > >>>>>>>>>>>>>>>>> temporary offline partitions
> > >>>>>>>>>>>>>>>>> leadership changing
> > >>>>>>>>>>>>>>>>>        Isn’t this putting the producer at risk of
> > >>> dropping
> > >>>>>>> records
> > >>>>>>>>>>>>>>>>> unintentionally?
> > >>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the
> > >>>>>>> compatibility
> > >>>>>>>> /
> > >>>>>>>>>>>>>>>>> migration plan section please by explaining in bit
> > >> more
> > >>>>>> details
> > >>>>>>>>>> what
> > >>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> changing behaviour and how this will impact client
> > >> who
> > >>>> are
> > >>>>>>>>>>> upgrading?
> > >>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the
> > >> KIP
> > >>>>> where
> > >>>>>> in
> > >>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and
> > >>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how
> > >> will
> > >>>> the
> > >>>>>>>>>> producer
> > >>>>>>>>>>>>>>>>> configure them to point to costumed implementation.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>>>> Omnia
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> > >>>>>>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
> > >>>>> Producer.
> > >>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I look forward to your feedback!
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to