Oh I see. The type isn't the error type but a newly defined type for the
response. Makes sense and works for me.

Justine

On Mon, May 13, 2024 at 9:13 AM Chris Egerton <fearthecel...@gmail.com>
wrote:

> If we have dedicated methods for each kind of exception
> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't that
> provide sufficient constraint? I'm not suggesting we eliminate these
> methods, just that we change their return types to something more flexible.
>
> On Mon, May 13, 2024, 12:07 Justine Olshan <jols...@confluent.io.invalid>
> wrote:
>
> > I'm not sure I agree with the Retriable and NonRetriableResponse comment.
> > This doesn't limit the blast radius or enforce certain errors are used.
> > I think we might disagree on how controlled these interfaces can be...
> >
> > Justine
> >
> > On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Alieh,
> > >
> > > Thanks for the updates! I just have a few more thoughts:
> > >
> > > - I don't think a boolean property is sufficient to dictate retries for
> > > unknown topic partitions, though. These errors can occur if a topic has
> > > just been created, which can occur if, for example, automatic topic
> > > creation is enabled for a multi-task connector. This is why I proposed
> a
> > > timeout instead of a boolean (and see my previous email for why
> reducing
> > > max.block.ms for a producer is not a viable alternative). If it helps,
> > one
> > > way to reproduce this yourself is to add the line
> > > `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> > > and then check the logs afterward for messages like "Error while
> fetching
> > > metadata with correlation id <n> :
> > {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
> > >
> > > - I also don't think we need custom XxxResponse enums for every
> possible
> > > method; it seems like this will lead to a lot of duplication and
> > cognitive
> > > overhead if we want to expand the error handler in the future.
> Something
> > > more flexible like RetriableResponse and NonRetriableResponse could
> > > suffice.
> > >
> > > - Finally, the KIP still doesn't state how the handler will or won't
> take
> > > precedence over existing retry properties. If I set `retries` or `
> > > delivery.timeout.ms` or `max.block.ms` to low values, will that cause
> > > retries to cease even if my custom handler would otherwise keep
> returning
> > > RETRY for an error?
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> > > andrew_schofi...@live.com>
> > > wrote:
> > >
> > > > Hi Alieh,
> > > > Just a few more comments on the KIP. It is looking much less risky
> now
> > > the
> > > > scope
> > > > is tighter.
> > > >
> > > > [AJS1] It would be nice to have default implementations of the handle
> > > > methods
> > > > so an implementor would not need to implement both themselves.
> > > >
> > > > [AJS2] Producer configurations which are class names usually end in
> > > > “.class”.
> > > > I suggest “custom.exception.handler.class”.
> > > >
> > > > [AJS3] If I implemented a handler, and I set a non-default value for
> > one
> > > > of the
> > > > new configuations, what happens? I would expect that the handler
> takes
> > > > precedence. I wasn’t quite clear what “the control will follow the
> > > handler
> > > > instructions” meant.
> > > >
> > > > [AJS4] Because you now have an enum for the
> > > > RecordTooLargeExceptionResponse,
> > > > I don’t think you need to state in the comment for
> > > > ProducerExceptionHandler that
> > > > RETRY will be interpreted as FAIL.
> > > >
> > > > Thanks,
> > > > Andrew
> > > >
> > > > > On 13 May 2024, at 14:53, Alieh Saeedi
> <asae...@confluent.io.INVALID
> > >
> > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > >
> > > > > Thanks for the very interesting discussion during my PTO.
> > > > >
> > > > >
> > > > > KIP updates and addressing concerns:
> > > > >
> > > > >
> > > > > 1) Two handle() methods are defined in ProducerExceptionHandler for
> > the
> > > > two
> > > > > exceptions with different input parameters so that we have
> > > > > handle(RecordTooLargeException e, ProducerRecord record) and
> > > > > handle(UnknownTopicOrPartitionException e, ProducerRecord record)
> > > > >
> > > > >
> > > > > 2) The ProducerExceptionHandler extends `Closable` as well.
> > > > >
> > > > >
> > > > > 3) The KIP suggests having two more configuration parameters with
> > > boolean
> > > > > values:
> > > > >
> > > > > - `drop.invalid.large.records` with a default value of `false` for
> > > > > swallowing too large records.
> > > > >
> > > > > - `retry.unknown.topic.partition` with a default value of `true`
> that
> > > > > performs RETRY for `max.block.ms` ms, encountering the
> > > > > UnknownTopicOrPartitionException.
> > > > >
> > > > >
> > > > > Hope the main concerns are addressed so that we can go forward with
> > > > voting.
> > > > >
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Alieh
> > > > >
> > > > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > >> Hi Mathias,
> > > > >>
> > > > >>> [AL1] While I see the point, I would think having a different
> > > callback
> > > > >> for every exception might not really be elegant?
> > > > >>
> > > > >> I'm not sure how to assess the level of elegance of the proposal,
> > but
> > > I
> > > > can
> > > > >> comment on the technical characteristics:
> > > > >>
> > > > >> 1. Having specific interfaces that codify the logic that is
> > currently
> > > > >> prescribed in the comments reduce the chance of making a mistake.
> > > > >> Commments may get ignored, misuderstood or etc. but if the
> contract
> > is
> > > > >> codified, the compilier will help to enforce the contract.
> > > > >> 2. Given that the logic is trickier than it seems (the
> > > record-too-large
> > > > is
> > > > >> an example that can easily confuse someone who's not intimately
> > > familiar
> > > > >> with the nuances of the batching logic), having a little more
> hoops
> > to
> > > > jump
> > > > >> would give a greater chance that whoever tries to add a new cases
> > > pauses
> > > > >> and thinks a bit more.
> > > > >> 3. As Justine pointed out, having different method will be a
> forcing
> > > > >> function to go through a KIP rather than smuggle new cases through
> > > > >> implementation.
> > > > >> 4. Sort of a consequence of the previous 3 -- all those things
> > reduce
> > > > the
> > > > >> chance of someone writing the code that works with 2 errors and
> then
> > > > when
> > > > >> more errors are added in the future will suddenly incorrectly
> ignore
> > > new
> > > > >> errors (the example I gave in the previous email).
> > > > >>
> > > > >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend
> as
> > > > >> business logic. If a user puts a bad filter condition in their KS
> > app,
> > > > and
> > > > >> drops messages
> > > > >>
> > > > >> I agree that there is always a chance to get a bug and lose
> > messages,
> > > > but
> > > > >> there are generally separation of concerns that has different risk
> > > > profile:
> > > > >> the filtering logic may be more rigorously tested and rarely
> changed
> > > > (say
> > > > >> an application developer does it), but setting the topics to
> produce
> > > > may be
> > > > >> done via configuration (e.g. a user of the application does it)
> and
> > > it's
> > > > >> generally an expectation that users would get an error when
> > > > configuration
> > > > >> is incorrect.
> > > > >>
> > > > >> What could be worse is that UnknownTopicOrPartitionException can
> be
> > an
> > > > >> intermittent error, i.e. with a generally correct configuration,
> > there
> > > > >> could be metadata propagation problem on the cluster and then a
> > random
> > > > set
> > > > >> of records could get lost.
> > > > >>
> > > > >>> [AL3] Maybe I misunderstand what you are saying, but to me,
> > checking
> > > > the
> > > > >> size of the record upfront is exactly what the KIP proposes? No?
> > > > >>
> > > > >> It achieves the same result but solves it differently, my
> proposal:
> > > > >>
> > > > >> 1. Application checks the validity of a record (maybe via a new
> > > > >> validateRecord method) before producing it, and can just exclude
> it
> > or
> > > > >> return an error to the user.
> > > > >> 2. Application produces the record -- at this point there are no
> > > records
> > > > >> that could return record too large, they were either skipped at
> > step 1
> > > > or
> > > > >> we didn't get here because step 1 failed.
> > > > >>
> > > > >> Vs. KIP's proposal
> > > > >>
> > > > >> 1. Application produces the record.
> > > > >> 2. Application gets a callback.
> > > > >> 3. Application returns the action on how to proceed.
> > > > >>
> > > > >> The advantage of the former is the clarity of semantics -- the
> > record
> > > is
> > > > >> invalid (property of the record, not a function of server state or
> > > > server
> > > > >> configuration) and we can clearly know that it is the record that
> is
> > > bad
> > > > >> and can never succeed.
> > > > >>
> > > > >> The KIP-proposed way actually has a very tricky point: it actually
> > > > handles
> > > > >> a subset of record-too-large exceptions.  The broker can return
> > > > >> record-too-large and reject the whole batch (but we don't want to
> > > ignore
> > > > >> those because then we can skip random records that just happened
> to
> > be
> > > > in
> > > > >> the same batch), in some sense we use the same error for 2
> different
> > > > >> conditions and understanding that requires pretty deep
> understanding
> > > of
> > > > >> Kafka internals.
> > > > >>
> > > > >> -Artem
> > > > >>
> > > > >>
> > > > >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> > > > <jols...@confluent.io.invalid
> > > > >>>
> > > > >> wrote:
> > > > >>
> > > > >>> My concern with respect to it being fragile: the code that
> ensures
> > > the
> > > > >>> error type is internal to the producer. Someone may see it and
> > say, I
> > > > >> want
> > > > >>> to add such and such error. This looks like internal code, so I
> > don't
> > > > >> need
> > > > >>> a KIP, and then they can change it to whatever they want thinking
> > it
> > > is
> > > > >>> within the typical kafka improvement protocol.
> > > > >>>
> > > > >>> Relying on an internal change to enforce an external API is
> fragile
> > > in
> > > > my
> > > > >>> opinion. That's why I sort of agreed with Artem with enforcing
> the
> > > > error
> > > > >> in
> > > > >>> the method signature -- part of the public API.
> > > > >>>
> > > > >>> Chris's comments on requiring more information to handler again
> > makes
> > > > me
> > > > >>> wonder if we are solving a problem of lack of information at the
> > > > >>> application level with a more powerful solution than we need.
> (Ie,
> > if
> > > > we
> > > > >>> had more information, could the application close and restart the
> > > > >>> transaction rather than having to drop records) But I am happy to
> > > > >>> compromise with a handler that we can agree is sufficiently
> > > controlled
> > > > >> and
> > > > >>> documented.
> > > > >>>
> > > > >>> Justine
> > > > >>>
> > > > >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hi Alieh,
> > > > >>>>
> > > > >>>> Continuing prior discussions:
> > > > >>>>
> > > > >>>> 1) Regarding the "flexibility" discussion, my overarching point
> is
> > > > >> that I
> > > > >>>> don't see the point in allowing for this kind of pluggable logic
> > > > >> without
> > > > >>>> also covering more scenarios. Take example 2 in the KIP: if
> we're
> > > > going
> > > > >>> to
> > > > >>>> implement retries only on "important" topics when a topic
> > partition
> > > > >> isn't
> > > > >>>> found, why wouldn't we also want to be able to do this for other
> > > > >> errors?
> > > > >>>> Again, taking authorization errors as an example, why wouldn't
> we
> > > want
> > > > >> to
> > > > >>>> be able to fail when we can't write to "important" topics
> because
> > > the
> > > > >>>> producer principal lacks sufficient ACLs, and drop the record if
> > the
> > > > >>> topic
> > > > >>>> isn't "important"? In a security-conscious environment with
> > > > >>>> runtime-dependent topic routing (which is a common feature of
> many
> > > > >> source
> > > > >>>> connectors, such as the Debezium connectors), this seems fairly
> > > > likely.
> > > > >>>>
> > > > >>>> 2) As far as changing the shape of the API goes, I like Artem's
> > idea
> > > > of
> > > > >>>> splitting out the interface based on specific exceptions. This
> may
> > > be
> > > > a
> > > > >>>> little laborious to expand in the future, but if we really want
> to
> > > > >>>> limit the exceptions that we cover with the handler and move
> > slowly
> > > > and
> > > > >>>> cautiously, then IMO it'd be reasonable to reflect that in the
> > > > >>> interface. I
> > > > >>>> also acknowledge that there's no way to completely prevent
> people
> > > from
> > > > >>>> shooting themselves in the foot by implementing the API
> > incorrectly,
> > > > >> but
> > > > >>> I
> > > > >>>> think it's worth it to do what we can--including leveraging the
> > Java
> > > > >>>> language's type system--to help them, so IMO there's value to
> > > > >> eliminating
> > > > >>>> the implicit behavior of failing when a policy returns RETRY
> for a
> > > > >>>> non-retriable error. This can take a variety of shapes and I'm
> not
> > > > >> going
> > > > >>> to
> > > > >>>> insist on anything specific, but I do want to again raise my
> > > concerns
> > > > >>> with
> > > > >>>> the current proposal and request that we find something a little
> > > > >> better.
> > > > >>>>
> > > > >>>> 3) Concerning the default implementation--actually, I meant
> what I
> > > > >> wrote
> > > > >>> :)
> > > > >>>> I don't want a "second" default, I want an implementation of
> this
> > > > >>> interface
> > > > >>>> to be used as the default if no others are specified. The
> behavior
> > > of
> > > > >>> this
> > > > >>>> default implementation would be identical to existing behavior
> (so
> > > > >> there
> > > > >>>> would be no backwards compatibility concerns like the ones
> raised
> > by
> > > > >>>> Matthias), but it would be possible to configure this default
> > > handler
> > > > >>> class
> > > > >>>> to behave differently for a basic set of scenarios. This would
> > > mirror
> > > > >>> (pun
> > > > >>>> intended) the approach we've taken with Mirror Maker 2 and its
> > > > >>>> ReplicationPolicy interface [1]. There is a default
> implementation
> > > > >>>> available [2] that recognizes a handful of basic configuration
> > > > >> properties
> > > > >>>> [3] for simple tweaks, but if users want, they can also
> implement
> > > > their
> > > > >>> own
> > > > >>>> replication policy for more fine-grained logic if those
> properties
> > > > >> aren't
> > > > >>>> flexible enough.
> > > > >>>>
> > > > >>>> More concretely, I'm imagining something like this for the
> > producer
> > > > >>>> exception handler:
> > > > >>>>
> > > > >>>> - Default implementation class
> > > > >>>> of
> > org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> > > > >>>> - This class would recognize two properties:
> > > > >>>>  - drop.invalid.large.records: Boolean property, defaults to
> > false.
> > > If
> > > > >>>> "false", then causes the handler to return FAIL whenever
> > > > >>>> a RecordTooLargeException is encountered; if "true", then causes
> > > > >>>> SWALLOW/SKIP/DROP to be returned instead
> > > > >>>>  - unknown.topic.partition.retry.timeout.ms: Integer property,
> > > > >> defaults
> > > > >>>> to
> > > > >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> > > encountered,
> > > > >>>> causes the handler to return FAIL if that record has been
> pending
> > > for
> > > > >>> more
> > > > >>>> than the retry timeout; otherwise, causes RETRY to be returned
> > > > >>>>
> > > > >>>> I think this is worth addressing now instead of later because it
> > > > forces
> > > > >>> us
> > > > >>>> to evaluate the usefulness of this interface and it addresses a
> > > > >>>> long-standing issue not just with Kafka Connect, but with the
> Java
> > > > >>> producer
> > > > >>>> in general. For reference, here are a few tickets I collected
> > after
> > > > >>> briefly
> > > > >>>> skimming our Jira showing that this is a real pain point for
> > users:
> > > > >>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> > > > >>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> > > > >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although
> this
> > is
> > > > >>>> frequently reported with Kafka Connect, it applies to anyone who
> > > > >>> configures
> > > > >>>> a producer to use a high retry timeout. I am aware of the
> > > > max.block.ms
> > > > >>>> property, but it's painful and IMO poor behavior to require
> users
> > to
> > > > >>> reduce
> > > > >>>> the value of this property just to handle the single scenario
> when
> > > > >> trying
> > > > >>>> to write to topics that don't exist, since it would also limit
> the
> > > > >> retry
> > > > >>>> timeout for other operations that are legitimately retriable.
> > > > >>>>
> > > > >>>> Raising new points:
> > > > >>>>
> > > > >>>> 5) I don't see the interplay between this handler and existing
> > > > >>>> retry-related properties mentioned anywhere in the KIP. I'm
> > assuming
> > > > >> that
> > > > >>>> properties like "retries", "max.block.ms", and "
> > delivery.timeout.ms
> > > "
> > > > >>> would
> > > > >>>> take precedence over the handler and once they are exhausted,
> the
> > > > >>>> record/batch will fail no matter what? If so, it's probably
> worth
> > > > >> briefly
> > > > >>>> mentioning this (no more than a sentence or two) in the KIP, and
> > if
> > > > >> not,
> > > > >>>> I'm curious what you have in mind.
> > > > >>>>
> > > > >>>> 6) I also wonder if the API provides enough information in its
> > > current
> > > > >>>> form. Would it be possible to provide handlers with some way of
> > > > >> tracking
> > > > >>>> how long a record has been pending for (i.e., how long it's been
> > > since
> > > > >>> the
> > > > >>>> record was provided as an argument to Producer::send)? One way
> to
> > do
> > > > >> this
> > > > >>>> could be to add a method like `onNewRecord(ProducerRecord)` and
> > > > >>>> allow/require the handler to do its own bookkeeping, probably
> > with a
> > > > >>>> matching `onRecordSuccess(ProducerRecord)` method so that the
> > > handler
> > > > >>>> doesn't eat up an ever-increasing amount of memory trying to
> track
> > > > >>> records.
> > > > >>>> An alternative could be to include information about the initial
> > > time
> > > > >> the
> > > > >>>> record was received by the producer and the number of retries
> that
> > > > have
> > > > >>>> been performed for it as parameters in the handle method(s), but
> > I'm
> > > > >> not
> > > > >>>> sure how easy this would be to implement and it might clutter
> > things
> > > > >> up a
> > > > >>>> bit too much.
> > > > >>>>
> > > > >>>> 7) A small request--can we add Closeable (or, if you prefer,
> > > > >>> AutoCloseable)
> > > > >>>> as a superinterface for the handler interface?
> > > > >>>>
> > > > >>>> [1] -
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> > > > >>>> [2] -
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> > > > >>>> [3] -
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> > > > >>>> ,
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> > > > >>>>
> > > > >>>> Cheers,
> > > > >>>>
> > > > >>>> Chris
> > > > >>>>
> > > > >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
> mj...@apache.org
> > >
> > > > >>> wrote:
> > > > >>>>
> > > > >>>>> Very interesting discussion.
> > > > >>>>>
> > > > >>>>> Seems a central point is the question about "how generic" we
> > > approach
> > > > >>>>> this, and some people think we need to be conservative and
> others
> > > > >> think
> > > > >>>>> we should try to be as generic as possible.
> > > > >>>>>
> > > > >>>>> Personally, I think following a limited scope for this KIP (by
> > > > >>>>> explicitly saying we only cover RecordTooLarge and
> > > > >>>>> UnknownTopicOrPartition) might be better. We have concrete
> > tickets
> > > > >> that
> > > > >>>>> we address, while for other exception (like authorization) we
> > don't
> > > > >>> know
> > > > >>>>> if people want to handle it to begin with. Boiling the ocean
> > might
> > > > >> not
> > > > >>>>> get us too far, and being somewhat pragmatic might help to move
> > > this
> > > > >>> KIP
> > > > >>>>> forward. -- I also agree with Justin and Artem, that we want to
> > be
> > > > >>>>> careful anyway to not allow users to break stuff too easily.
> > > > >>>>>
> > > > >>>>> As the same time, I agree that we should setup this change in a
> > > > >> forward
> > > > >>>>> looking way, and thus having a single generic handler allows us
> > to
> > > > >>> later
> > > > >>>>> extend the handler more easily. This should also simplify
> follow
> > up
> > > > >> KIP
> > > > >>>>> that might add new error cases (I actually mentioned one more
> to
> > > > >> Alieh
> > > > >>>>> already, but we both agreed that it might be best to exclude it
> > > from
> > > > >>> the
> > > > >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP
> is
> > > not
> > > > >>>>> the end of the world.)
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> @Chris:
> > > > >>>>>
> > > > >>>>> (2) This sounds fair to me, but not sure how "bad" it actually
> > > would
> > > > >>> be?
> > > > >>>>> If the contract is clearly defined, it seems to be fine what
> the
> > > KIP
> > > > >>>>> proposes, and given that such a handler is an expert API, and
> we
> > > can
> > > > >>>>> provide "best practices" (cf my other comment below in [AL1]),
> > > being
> > > > >> a
> > > > >>>>> little bit pragmatic sound fine to me.
> > > > >>>>>
> > > > >>>>> Not sure if I understand Justin's argument on this question?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> (3) About having a default impl or not. I am fine with adding
> > one,
> > > > >> even
> > > > >>>>> if I am not sure why Connect could not just add its own one and
> > > plug
> > > > >> it
> > > > >>>>> in (and we would add corresponding configs for Connect, but not
> > for
> > > > >> the
> > > > >>>>> producer itself)? For this case, we could also do this as a
> > follow
> > > up
> > > > >>>>> KIP, but happy to include it in this KIP to provide value to
> > > Connect
> > > > >>>>> right away (even if the value might not come right away if we
> > miss
> > > > >> the
> > > > >>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we would
> > for
> > > > >>> sure
> > > > >>>>> plugin our own impl, and lock down the config such that users
> > > cannot
> > > > >>> set
> > > > >>>>> their own handler on the internal producer to begin with. Might
> > be
> > > > >> good
> > > > >>>>> to elaborate why the producer should have a default? We might
> > > > >> actually
> > > > >>>>> want to add this to the KIP right away?
> > > > >>>>>
> > > > >>>>> The key for a default impl would be, to not change the current
> > > > >>> behavior,
> > > > >>>>> and having no default seems to achieve this. For the two cases
> > you
> > > > >>>>> mentioned, it's unclear to me what default value on "upper
> bound
> > on
> > > > >>>>> retires" for UnkownTopicOrPartitionException we should set?
> Seems
> > > it
> > > > >>>>> would need to be the same as `delivery.timeout.ms`? However,
> if
> > > > >> users
> > > > >>>>> have `delivery.timeout.ms` actually overwritten we would need
> to
> > > set
> > > > >>>>> this config somewhat "dynamic"? Is this feasible? If we
> > hard-code 2
> > > > >>>>> minutes, it might not be backward compatible. I have the
> > impression
> > > > >> we
> > > > >>>>> might introduce some undesired coupling? -- For the "record too
> > > > >> large"
> > > > >>>>> case, the config seems to be boolean and setting it to `false`
> by
> > > > >>>>> default seems to provide backward compatibility.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> @Artem:
> > > > >>>>>
> > > > >>>>> [AL1] While I see the point, I would think having a different
> > > > >> callback
> > > > >>>>> for every exception might not really be elegant? In the end,
> the
> > > > >>> handler
> > > > >>>>> is an very advanced feature anyway, and if it's implemented in
> a
> > > bad
> > > > >>>>> way, well, it's a user error -- we cannot protect users from
> > > > >>> everything.
> > > > >>>>> To me, a handler like this, is to some extend "business logic"
> > and
> > > > >> if a
> > > > >>>>> user gets business logic wrong, it's hard to protect them. --
> We
> > > > >> would
> > > > >>>>> of course provide best practice guidance in the JaveDocs, and
> > > explain
> > > > >>>>> that a handler should have explicit `if` statements for stuff
> it
> > > want
> > > > >>> to
> > > > >>>>> handle, and only a single default which return FAIL.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [AL2] Yes, but for KS we would retry at the application layer.
> > Ie,
> > > > >> the
> > > > >>>>> TX is not completed, we clean up and setup out task from
> scratch,
> > > to
> > > > >>>>> ensure the pending transaction is completed before we resume.
> If
> > > the
> > > > >> TX
> > > > >>>>> was indeed aborted, we would retry from older offset and thus
> > just
> > > > >> hit
> > > > >>>>> the same error again and the loop begins again.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend
> > as
> > > > >>>>> business logic. If a user puts a bad filter condition in their
> KS
> > > > >> app,
> > > > >>>>> and drops messages, it nothing we can do about it, and this
> > handler
> > > > >>>>> IMHO, has a similar purpose. This is also the line of thinking
> I
> > > > >> apply
> > > > >>>>> to EOS, to address Justin's concern about "should we allow to
> > drop
> > > > >> for
> > > > >>>>> EOS", and my answer is "yes", because it's more business logic
> > than
> > > > >>>>> actual error handling IMHO. And by default, we fail... So users
> > > > >> opt-in
> > > > >>>>> to add business logic to drop records. It's an application
> level
> > > > >>>>> decision how to write the code.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> > > checking
> > > > >>> the
> > > > >>>>> size of the record upfront is exactly what the KIP proposes?
> No?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> @Justin:
> > > > >>>>>
> > > > >>>>>> I saw the sample
> > > > >>>>>> code -- is it just an if statement checking for the error
> before
> > > > >> the
> > > > >>>>>> handler is invoked? That seems a bit fragile.
> > > > >>>>>
> > > > >>>>> What do you mean by fragile? Not sure if I see your point.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> -Matthias
> > > > >>>>>
> > > > >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> > > > >>>>>> Hi Alieh,
> > > > >>>>>>
> > > > >>>>>> Thanks for the KIP.  The motivation talks about very specific
> > > > >> cases,
> > > > >>>> but
> > > > >>>>>> the interface is generic.
> > > > >>>>>>
> > > > >>>>>> [AL1]
> > > > >>>>>> If the interface evolves in the future I think we could have
> the
> > > > >>>>> following
> > > > >>>>>> confusion:
> > > > >>>>>>
> > > > >>>>>> 1. A user implemented SWALLOW action for both
> > > > >> RecordTooLargeException
> > > > >>>> and
> > > > >>>>>> UnknownTopicOrPartitionException.  For simpicity they just
> > return
> > > > >>>> SWALLOW
> > > > >>>>>> from the function, because it elegantly handles all known
> cases.
> > > > >>>>>> 2. The interface has evolved to support a new exception.
> > > > >>>>>> 3. The user has upgraded their Kafka client.
> > > > >>>>>>
> > > > >>>>>> Now a new kind of error gets dropped on the floor without
> user's
> > > > >>>>> intention
> > > > >>>>>> and it would be super hard to detect and debug.
> > > > >>>>>>
> > > > >>>>>> To avoid the confusion, I think we should use handlers for
> > > specific
> > > > >>>>>> exceptions.  Then if a new exception is added it won't get
> > > silently
> > > > >>>>> swalled
> > > > >>>>>> because the user would need to add new functionality to handle
> > it.
> > > > >>>>>>
> > > > >>>>>> I also have some higher level comments:
> > > > >>>>>>
> > > > >>>>>> [AL2]
> > > > >>>>>>> it throws a TimeoutException, and the user can only blindly
> > > retry,
> > > > >>>> which
> > > > >>>>>> may result in an infinite retry loop
> > > > >>>>>>
> > > > >>>>>> If the TimeoutException happens during transactional
> processing
> > > > >>>> (exactly
> > > > >>>>>> once is the desired sematnics), then the client should not
> retry
> > > > >> when
> > > > >>>> it
> > > > >>>>>> gets TimeoutException because without knowing the reason for
> > > > >>>>>> TimeoutExceptions, the client cannot know whether the message
> > got
> > > > >>>>> actually
> > > > >>>>>> produced or not and retrying the message may result in
> > > duplicatees.
> > > > >>>>>>
> > > > >>>>>>> The thrown TimeoutException "cuts" the connection to the
> > > > >> underlying
> > > > >>>> root
> > > > >>>>>> cause of missing metadata
> > > > >>>>>>
> > > > >>>>>> Maybe we should fix the error handling and return the proper
> > > > >>> underlying
> > > > >>>>>> message?  Then the application can properly handle the message
> > > > >> based
> > > > >>> on
> > > > >>>>>> preferences.
> > > > >>>>>>
> > > > >>>>>> From the product perspective, it's not clear how safe it is to
> > > > >>> blindly
> > > > >>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
> > > > >>> situations
> > > > >>>>>> when a simple typo could lead to massive data loss (part of
> the
> > > > >> data
> > > > >>>>> would
> > > > >>>>>> effectively be produced to a "black hole" and the user may not
> > > > >> notice
> > > > >>>> it
> > > > >>>>>> for a while).
> > > > >>>>>>
> > > > >>>>>> In which situations would you recommend the user to "black
> hole"
> > > > >>>> messages
> > > > >>>>>> in case of misconfiguration?
> > > > >>>>>>
> > > > >>>>>> [AL3]
> > > > >>>>>>
> > > > >>>>>>> If the custom handler decides on SWALLOW for
> > > > >>> RecordTooLargeException,
> > > > >>>>>>
> > > > >>>>>> Is it my understanding that this KIP proposes that
> functionality
> > > > >> that
> > > > >>>>> would
> > > > >>>>>> only be able to SWALLOW RecordTooLargeException that happen
> > > because
> > > > >>> the
> > > > >>>>>> producer cannot produce the record (if the broker rejects the
> > > > >> batch,
> > > > >>>> the
> > > > >>>>>> error won't get to the handler, because we cannot know which
> > other
> > > > >>>>> records
> > > > >>>>>> get ignored).  In this case, why not just check the locally
> > > > >>> configured
> > > > >>>>> max
> > > > >>>>>> record size upfront and not produce the recrord in the first
> > > place?
> > > > >>>>> Maybe
> > > > >>>>>> we can expose a validation function from the producer that
> could
> > > > >>>> validate
> > > > >>>>>> the records locally, so we don't need to produce the record in
> > > > >> order
> > > > >>> to
> > > > >>>>>> know that it's invalid.
> > > > >>>>>>
> > > > >>>>>> -Artem
> > > > >>>>>>
> > > > >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> > > > >>>>> <jols...@confluent.io.invalid>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Alieh and Chris,
> > > > >>>>>>>
> > > > >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I
> > just
> > > > >>>> didn't
> > > > >>>>>>> understand how that would be ensured on the producer side. I
> > saw
> > > > >> the
> > > > >>>>> sample
> > > > >>>>>>> code -- is it just an if statement checking for the error
> > before
> > > > >> the
> > > > >>>>>>> handler is invoked? That seems a bit fragile.
> > > > >>>>>>>
> > > > >>>>>>> Can you clarify what you mean by `since the code does not
> reach
> > > > >> the
> > > > >>> KS
> > > > >>>>>>> interface and breaks somewhere in producer.` If we surfaced
> > this
> > > > >>> error
> > > > >>>>> to
> > > > >>>>>>> the application in a better way would that also be a solution
> > to
> > > > >> the
> > > > >>>>> issue?
> > > > >>>>>>>
> > > > >>>>>>> Justine
> > > > >>>>>>>
> > > > >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> > > > >>>>> <asae...@confluent.io.invalid>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi,
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Thank you, Chris and Justine, for the feedback.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> @Chris
> > > > >>>>>>>>
> > > > >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is
> the
> > > one
> > > > >>> you
> > > > >>>>>>>> mentioned. We are going to cover more exceptions in the
> > future,
> > > > >> but
> > > > >>>> as
> > > > >>>>>>>> Justine mentioned, we must be very conservative about adding
> > > more
> > > > >>>>>>>> exceptions. Additionally, flexibility mainly means that the
> > user
> > > > >> is
> > > > >>>>> able
> > > > >>>>>>> to
> > > > >>>>>>>> develop their own code. As mentioned in the motivation
> section
> > > > >> and
> > > > >>>> the
> > > > >>>>>>>> examples, sometimes the user decides on dropping a record
> > based
> > > > >> on
> > > > >>>> the
> > > > >>>>>>>> topic, for example.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 2) Defining two separate methods for retriable and
> > non-retriable
> > > > >>>>>>>> exceptions: although the idea is brilliant, the user may
> still
> > > > >>> make a
> > > > >>>>>>>> mistake by implementing the wrong method and see a
> > non-expecting
> > > > >>>>>>> behaviour.
> > > > >>>>>>>> For example, he may implement handleRetriable() for
> > > > >>>>>>> RecordTooLargeException
> > > > >>>>>>>> and define SWALLOW for the exception, but in practice, he
> sees
> > > no
> > > > >>>>> change
> > > > >>>>>>> in
> > > > >>>>>>>> default behaviour since he implemented the wrong method. I
> > think
> > > > >> we
> > > > >>>> can
> > > > >>>>>>>> never reduce the user’s mistakes to 0.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 3) Default implementation for Handler: the default behaviour
> > is
> > > > >>>> already
> > > > >>>>>>>> preserved with NO need of implementing any handler or
> setting
> > > the
> > > > >>>>>>>> corresponding config parameter `custom.exception.handler`.
> > What
> > > > >> you
> > > > >>>>> mean
> > > > >>>>>>> is
> > > > >>>>>>>> actually having a second default, which requires having both
> > > > >>>> interface
> > > > >>>>>>> and
> > > > >>>>>>>> config parameters. About UnknownTopicOrPartitionException:
> the
> > > > >>>> producer
> > > > >>>>>>>> already offers the config parameter `max.block.ms` which
> > > > >>> determines
> > > > >>>>> the
> > > > >>>>>>>> duration of retrying. The main purpose of the user who needs
> > the
> > > > >>>>> handler
> > > > >>>>>>> is
> > > > >>>>>>>> to get the root cause of TimeoutException and handle it in
> the
> > > > >> way
> > > > >>> he
> > > > >>>>>>>> intends. The KIP explains the necessity of it for KS users.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the
> > > error,
> > > > >>>> while
> > > > >>>>>>>> SKIP means skip the record, I think. If it makes sense for
> > more
> > > > >>> ppl,
> > > > >>>> I
> > > > >>>>>>> can
> > > > >>>>>>>> change it to SKIP
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> @Justine
> > > > >>>>>>>>
> > > > >>>>>>>> 1) was addressed by Chris.
> > > > >>>>>>>>
> > > > >>>>>>>> 2 and 3) The problem is exactly what you mentioned.
> Currently,
> > > > >>> there
> > > > >>>> is
> > > > >>>>>>> no
> > > > >>>>>>>> way to handle these issues application-side. Even KS users
> who
> > > > >>>>> implement
> > > > >>>>>>> KS
> > > > >>>>>>>> ProductionExceptionHandler are not able to handle the
> > exceptions
> > > > >> as
> > > > >>>>> they
> > > > >>>>>>>> intend since the code does not reach the KS interface and
> > breaks
> > > > >>>>>>> somewhere
> > > > >>>>>>>> in producer.
> > > > >>>>>>>>
> > > > >>>>>>>> Cheers,
> > > > >>>>>>>> Alieh
> > > > >>>>>>>>
> > > > >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> > > > >>>> fearthecel...@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi Justine,
> > > > >>>>>>>>>
> > > > >>>>>>>>> The method signatures for the interface are indeed
> > open-ended,
> > > > >> but
> > > > >>>> the
> > > > >>>>>>>> KIP
> > > > >>>>>>>>> states that its uses will be limited. See the motivation
> > > > >> section:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> We believe that the user should be able to develop custom
> > > > >>> exception
> > > > >>>>>>>>> handlers for managing producer exceptions. On the other
> hand,
> > > > >> this
> > > > >>>>> will
> > > > >>>>>>>> be
> > > > >>>>>>>>> an expert-level API, and using that may result in strange
> > > > >>> behaviour
> > > > >>>> in
> > > > >>>>>>>> the
> > > > >>>>>>>>> system, making it hard to find the root cause. Therefore,
> the
> > > > >>> custom
> > > > >>>>>>>>> handler is currently limited to handling
> > > RecordTooLargeException
> > > > >>> and
> > > > >>>>>>>>> UnknownTopicOrPartitionException.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Cheers,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Chris
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> > > > >>>>> <jols...@confluent.io.invalid
> > > > >>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I was out for KSB and then was also sick. :(
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two
> > > > >>> specific
> > > > >>>>>>>>>> scenarios, since the interface accepts a generic
> Exception e
> > > > >> and
> > > > >>>> can
> > > > >>>>>>> be
> > > > >>>>>>>>>> implemented to check if that e is an instanceof any
> > exception.
> > > > >> I
> > > > >>>>>>> didn't
> > > > >>>>>>>>> see
> > > > >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
> > > concerned
> > > > >>>> about
> > > > >>>>>>>>> this
> > > > >>>>>>>>>> actually. I'm concerned about the opened-endedness and the
> > > > >>> contract
> > > > >>>>>>> we
> > > > >>>>>>>>> have
> > > > >>>>>>>>>> with transactions. We are allowing the client to make
> > > decisions
> > > > >>>> that
> > > > >>>>>>>> are
> > > > >>>>>>>>>> somewhat invisible to the server. As an aside, can we
> build
> > in
> > > > >>> log
> > > > >>>>>>>>> messages
> > > > >>>>>>>>>> when the handler decides to skip etc a message. I'm really
> > > > >>>> concerned
> > > > >>>>>>>>> about
> > > > >>>>>>>>>> messages being silently dropped.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I do think Chris's point 2) about retriable vs non
> retriable
> > > > >>> errors
> > > > >>>>>>> is
> > > > >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic
> or
> > > > >>>> partition
> > > > >>>>>>>>>> exception too early, as there are cases where it can be
> > > > >>> transient.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I'm still a little bit wary of allowing dropping records
> as
> > > > >> part
> > > > >>> of
> > > > >>>>>>> EOS
> > > > >>>>>>>>>> generally as in many cases, these errors signify an issue
> > with
> > > > >>> the
> > > > >>>>>>>>> original
> > > > >>>>>>>>>> data. I understand that streams and connect/mirror maker
> may
> > > > >> have
> > > > >>>>>>>> reasons
> > > > >>>>>>>>>> they want to progress past these messages, but wondering
> if
> > > > >> there
> > > > >>>> is
> > > > >>>>>>> a
> > > > >>>>>>>>> way
> > > > >>>>>>>>>> that can be done application-side. I'm willing to accept
> > this
> > > > >>> sort
> > > > >>>> of
> > > > >>>>>>>>>> proposal if we can make it clear that this sort of thing
> is
> > > > >>>> happening
> > > > >>>>>>>> and
> > > > >>>>>>>>>> we limit the blast radius for what we can do.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Justine
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> > > > >>>> <chr...@aiven.io.invalid
> > > > >>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have
> some
> > > > >>>> thoughts
> > > > >>>>>>>>> that
> > > > >>>>>>>>>>> I'd like to see addressed before voting.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
> > > interface,
> > > > >>> why
> > > > >>>>>>>> are
> > > > >>>>>>>>> we
> > > > >>>>>>>>>>> only limiting the uses for this interface to two very
> > > specific
> > > > >>>>>>>>> scenarios?
> > > > >>>>>>>>>>> Why not also allow, e.g., authorization errors to be
> > handled
> > > > >> as
> > > > >>>>>>> well
> > > > >>>>>>>>>>> (allowing users to drop records destined for some
> > off-limits
> > > > >>>>>>> topics,
> > > > >>>>>>>> or
> > > > >>>>>>>>>>> retry for a limited duration in case there's a delay in
> the
> > > > >>>>>>>> propagation
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other
> > > > >> errors
> > > > >>>>>>> that
> > > > >>>>>>>>>> could
> > > > >>>>>>>>>>> be handled with this new API, both to avoid the follow-up
> > > work
> > > > >>> of
> > > > >>>>>>>>> another
> > > > >>>>>>>>>>> KIP to address them in the future, and to make sure that
> > > we're
> > > > >>> not
> > > > >>>>>>>>>> painting
> > > > >>>>>>>>>>> ourselves into a corner with the current API in a way
> that
> > > > >> would
> > > > >>>>>>> make
> > > > >>>>>>>>>>> future modifications difficult.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> > > > >>> non-retriable
> > > > >>>>>>>>> errors
> > > > >>>>>>>>>>> are handled with the interface. Why not introduce two
> > > separate
> > > > >>>>>>>> methods
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>> handle each case separately? That way there's no
> ambiguity
> > or
> > > > >>>>>>>> implicit
> > > > >>>>>>>>>>> behavior when, e.g., attempting to retry on a
> > > > >>>>>>>> RecordTooLargeException.
> > > > >>>>>>>>>> This
> > > > >>>>>>>>>>> could be something like `NonRetriableResponse
> > > > >>>>>>> handle(ProducerRecord,
> > > > >>>>>>>>>>> Exception)` and `RetriableResponse
> > > > >>> handleRetriable(ProducerRecord,
> > > > >>>>>>>>>>> Exception)`, though the exact names and shape can
> obviously
> > > be
> > > > >>>>>>> toyed
> > > > >>>>>>>>>> with a
> > > > >>>>>>>>>>> bit.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may
> > > > >> benefit
> > > > >>>>>>> some
> > > > >>>>>>>>>>> users' custom producer applications and Kafka Streams
> > > > >>>> applications,
> > > > >>>>>>>> it
> > > > >>>>>>>>>>> comes at significant deployment cost for other
> low-/no-code
> > > > >>>>>>>>> environments,
> > > > >>>>>>>>>>> including but not limited to Kafka Connect and
> MirrorMaker
> > 2.
> > > > >>> Can
> > > > >>>>>>> we
> > > > >>>>>>>>> add
> > > > >>>>>>>>>> a
> > > > >>>>>>>>>>> default implementation of the exception handler that
> allows
> > > > >> for
> > > > >>>>>>> some
> > > > >>>>>>>>>> simple
> > > > >>>>>>>>>>> behavior to be tweaked via configuration property? Two
> > things
> > > > >>> that
> > > > >>>>>>>>> would
> > > > >>>>>>>>>> be
> > > > >>>>>>>>>>> nice to have would be A) an upper bound on the retry time
> > for
> > > > >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to
> drop
> > > > >>>> records
> > > > >>>>>>>>> that
> > > > >>>>>>>>>>> are large enough to trigger a record-too-large exception.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of
> the
> > > > >>>> proposed
> > > > >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
> > > > >>> especially
> > > > >>>>>>>> when
> > > > >>>>>>>>>>> trying to guess the behavior for retriable errors.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Chris
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> > > > >>>>>>>>>> <asae...@confluent.io.invalid
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> A summary of the KIP and the discussions:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> The KIP introduces a handler interface for Producer in
> > order
> > > > >> to
> > > > >>>>>>>>> handle
> > > > >>>>>>>>>>> two
> > > > >>>>>>>>>>>> exceptions: RecordTooLargeException and
> > > > >>>>>>>>>> UnknownTopicOrPartitionException.
> > > > >>>>>>>>>>>> The handler handles the exceptions per-record.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> > > > >> sections]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the
> producer
> > > > >>>>>>> collects
> > > > >>>>>>>>>>> multiple
> > > > >>>>>>>>>>>> records in batches. Then a RecordTooLargeException
> related
> > > > >> to a
> > > > >>>>>>>>> single
> > > > >>>>>>>>>>>> record leads to failing the entire batch. A custom
> > exception
> > > > >>>>>>>> handler
> > > > >>>>>>>>> in
> > > > >>>>>>>>>>>> this case may decide on dropping the record and
> continuing
> > > > >> the
> > > > >>>>>>>>>>> processing.
> > > > >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a
> > > > >> record
> > > > >>>>>>>> that
> > > > >>>>>>>>> is
> > > > >>>>>>>>>>> too
> > > > >>>>>>>>>>>> large is a poison pill record, and there is no way to
> skip
> > > > >> over
> > > > >>>>>>>> it. A
> > > > >>>>>>>>>>>> handler would allow us to react to this error inside the
> > > > >>>>>>> producer,
> > > > >>>>>>>>>> i.e.,
> > > > >>>>>>>>>>>> local to where the error happens, and thus simplify the
> > > > >> overall
> > > > >>>>>>>> code
> > > > >>>>>>>>>>>> significantly. Please read the Motivation section for
> more
> > > > >>>>>>>>> explanation.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
> > > producer
> > > > >>>>>>>> handles
> > > > >>>>>>>>>>> this
> > > > >>>>>>>>>>>> exception internally and only issues a WARN log about
> > > missing
> > > > >>>>>>>>> metadata
> > > > >>>>>>>>>>> and
> > > > >>>>>>>>>>>> retries internally. Later, when the producer hits "
> > > > >>>>>>>>> deliver.timeout.ms"
> > > > >>>>>>>>>>> it
> > > > >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly
> > > > >> retry,
> > > > >>>>>>>> which
> > > > >>>>>>>>>> may
> > > > >>>>>>>>>>>> result in an infinite retry loop. The thrown
> > > TimeoutException
> > > > >>>>>>>> "cuts"
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>> connection to the underlying root cause of missing
> > metadata
> > > > >>>>>>> (which
> > > > >>>>>>>>>> could
> > > > >>>>>>>>>>>> indeed be a transient error but is persistent for a
> > > > >>> non-existing
> > > > >>>>>>>>>> topic).
> > > > >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite
> > > > >> retry
> > > > >>>>>>>> loop.
> > > > >>>>>>>>>>> Kafka
> > > > >>>>>>>>>>>> Streams also blindly retries for this case, and the
> > > > >> application
> > > > >>>>>>>> gets
> > > > >>>>>>>>>>> stuck.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - Having interface vs configuration option: [Motivation,
> > > > >>>>>>> Examples,
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>>> Rejected Alternatives sections]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Our solution is introducing an interface due to the full
> > > > >>>>>>>> flexibility
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams
> ones,
> > > > >>>>>>>> determine
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>> handler's behaviour based on the situation. For
> example, f
> > > > >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may
> > want
> > > > >> to
> > > > >>>>>>>> raise
> > > > >>>>>>>>> an
> > > > >>>>>>>>>>>> error for some topics but retry it for other topics.
> > Having
> > > a
> > > > >>>>>>>>>>> configuration
> > > > >>>>>>>>>>>> option with a fixed set of possibilities does not serve
> > the
> > > > >>>>>>> user's
> > > > >>>>>>>>>>>> needs. See Example 2, please.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
> > > > >> section]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If the custom handler decides on SWALLOW for
> > > > >>>>>>>> RecordTooLargeException,
> > > > >>>>>>>>>>> then
> > > > >>>>>>>>>>>> this record will not be a part of the batch of
> > transactions
> > > > >> and
> > > > >>>>>>>> will
> > > > >>>>>>>>>> also
> > > > >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So
> no
> > > > >>>>>>> worries
> > > > >>>>>>>>>> about
> > > > >>>>>>>>>>>> getting a RecordTooLargeException from the broker in
> this
> > > > >> case,
> > > > >>>>>>> as
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW
> > means
> > > > >>> drop
> > > > >>>>>>>> the
> > > > >>>>>>>>>>> record
> > > > >>>>>>>>>>>> and continue/swallow the error.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - What if the handle() method implements RETRY for
> > > > >>>>>>>>>>> RecordTooLargeException?
> > > > >>>>>>>>>>>> [Proposed Changes section]
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW
> for
> > > > >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal
> to
> > > > >> FAIL.
> > > > >>>>>>>> This
> > > > >>>>>>>>> is
> > > > >>>>>>>>>>>> well documented/informed in javadoc.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - What if the handle() method of the handler throws an
> > > > >>> exception?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> The handler is expected to have correct code. If it
> throws
> > > an
> > > > >>>>>>>>>> exception,
> > > > >>>>>>>>>>>> everything fails.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> This is a PoC PR <
> > > https://github.com/apache/kafka/pull/15846
> > > > >>>
> > > > >>>>>>> ONLY
> > > > >>>>>>>>> for
> > > > >>>>>>>>>>>> RecordTooLargeException. The code changes related to
> > > > >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this
> PR
> > > > >>> LATER.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Looking forward to your feedback again.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Alieh
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> > > > >> k...@kirktrue.pro>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks for the updates!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Comments inline...
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> > > > >>>>>>>>>>> <asae...@confluent.io.INVALID
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Addressing some of the main concerns:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by
> broker,
> > > > >>>>>>>> producer
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
> > > > >> interface
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>> introduced
> > > > >>>>>>>>>>>>>> to affect only the exceptions thrown from the
> producer.
> > > > >> This
> > > > >>>>>>>> KIP
> > > > >>>>>>>>>> very
> > > > >>>>>>>>>>>>>> specifically means to provide a possibility to manage
> > the
> > > > >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the
> > Producer.send()
> > > > >>>>>>>> method.
> > > > >>>>>>>>>>>> Please
> > > > >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> > > > >>>>>>> investigated
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> issue
> > > > >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern
> > about
> > > > >> how
> > > > >>>>>>>> we
> > > > >>>>>>>>>>> handle
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> errors as well.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are
> > > called
> > > > >>>>>>>> when
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>> record
> > > > >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not
> > the
> > > > >>>>>>>> desired
> > > > >>>>>>>>>>> time
> > > > >>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions
> > beforehand.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when
> > > > >>>>>>> Callback
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> > > > >>>>>>>>>>>>> `RecordTooLargeException`? I
> > > > >>>>>>>>>>>>>> assume changing the producer configuration at runtime
> is
> > > > >>>>>>>>> possible.
> > > > >>>>>>>>>> If
> > > > >>>>>>>>>>>> so,
> > > > >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in
> > the
> > > > >>>>>>> next
> > > > >>>>>>>>>> try,
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> too large record is not poisoning any more. I am not
> > 100%
> > > > >>>>>>> sure
> > > > >>>>>>>>>> about
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider
> > the
> > > > >>>>>>> RETRY
> > > > >>>>>>>>> as
> > > > >>>>>>>>>>> FAIL
> > > > >>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>> this exception. Another solution would be to consider
> a
> > > > >>>>>>>> constant
> > > > >>>>>>>>>>> number
> > > > >>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>> times for RETRY which can be useful for other
> exceptions
> > > as
> > > > >>>>>>>> well.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> It’s not presently possible to change the configuration
> > of
> > > > >> an
> > > > >>>>>>>>>> existing
> > > > >>>>>>>>>>>>> Producer at runtime. So if a record hits a
> > > > >>>>>>>> RecordTooLargeException
> > > > >>>>>>>>>>> once,
> > > > >>>>>>>>>>>> no
> > > > >>>>>>>>>>>>> amount of retrying (with the current Producer) will
> > change
> > > > >>> that
> > > > >>>>>>>>> fact.
> > > > >>>>>>>>>>> So
> > > > >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of
> > > > >> RETRY
> > > > >>>>>>> for
> > > > >>>>>>>>> an
> > > > >>>>>>>>>>>>> “oversized” record.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - What if the handle() method itself throws an
> > exception?
> > > I
> > > > >>>>>>>> think
> > > > >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be
> > > exactly
> > > > >>>>>>>> like
> > > > >>>>>>>>>> when
> > > > >>>>>>>>>>>> no
> > > > >>>>>>>>>>>>>> custom handler is defined since the user actually did
> > not
> > > > >>>>>>> have
> > > > >>>>>>>> a
> > > > >>>>>>>>>>>> working
> > > > >>>>>>>>>>>>>> handler.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is
> the
> > > > >> right
> > > > >>>>>>>>>> choice.
> > > > >>>>>>>>>>> It
> > > > >>>>>>>>>>>>> then becomes a silent failure that might have
> > > repercussions,
> > > > >>>>>>>>>> depending
> > > > >>>>>>>>>>> on
> > > > >>>>>>>>>>>>> the business logic. A user would have to proactively
> > trawls
> > > > >>>>>>>> through
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> - Why not use config parameters instead of an
> interface?
> > > As
> > > > >>>>>>>>>> explained
> > > > >>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that
> the
> > > > >>>>>>> handler
> > > > >>>>>>>>>> will
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>>>> used for a greater number of exceptions in the future.
> > > > >>>>>>>> Defining a
> > > > >>>>>>>>>>>>>> configuration parameter for each exception may make
> the
> > > > >>>>>>>>>>> configuration a
> > > > >>>>>>>>>>>>> bit
> > > > >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is
> weird
> > > > >> and
> > > > >>>>>>>>>> limiting.
> > > > >>>>>>>>>>>>> Forget I ever suggested it ;)
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I’d think additional background in the Motivation
> section
> > > > >>> would
> > > > >>>>>>>>> help
> > > > >>>>>>>>>> me
> > > > >>>>>>>>>>>>> understand how users might use this feature beyond a)
> > > > >> skipping
> > > > >>>>>>>>>>>> “oversized”
> > > > >>>>>>>>>>>>> records, and b) not retrying missing topics.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Small change:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for
> > > brevity
> > > > >>>>>>> and
> > > > >>>>>>>>>>>>> simplicity.
> > > > >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> The name change sounds good to me.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks Alieh!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I thank you all again for your useful
> > > > >> questions/suggestions.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as
> > stated
> > > > >> in
> > > > >>>>>>>> some
> > > > >>>>>>>>>>>>> feedback.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>>> Alieh
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> > > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks Alieh for the updates.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here.
> > It
> > > > >>>>>>> seems
> > > > >>>>>>>>>> like
> > > > >>>>>>>>>>> we
> > > > >>>>>>>>>>>>> want
> > > > >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic
> > > > >>>>>>> handler.
> > > > >>>>>>>>>>>>>>> I think we tried to narrow down on the specific
> errors
> > we
> > > > >>>>>>> want
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>>> handle,
> > > > >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic
> thing
> > > > >> for
> > > > >>>>>>>> two
> > > > >>>>>>>>>>>> specific
> > > > >>>>>>>>>>>>>>> errors.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to
> > solve
> > > > >>>>>>>> these
> > > > >>>>>>>>>>>>> problems. I
> > > > >>>>>>>>>>>>>>> agree though that we will need something more than
> the
> > > > >> error
> > > > >>>>>>>>>> classes
> > > > >>>>>>>>>>>> I'm
> > > > >>>>>>>>>>>>>>> proposing if we want to have different handling be
> > > > >>>>>>>> configurable.
> > > > >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler
> > means
> > > > >>>>>>> that
> > > > >>>>>>>> we
> > > > >>>>>>>>>> are
> > > > >>>>>>>>>>>>>>> creating more problems than we are solving. It is
> still
> > > > >>>>>>>> unclear
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>> me
> > > > >>>>>>>>>>>>> how
> > > > >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could
> > include
> > > > >> an
> > > > >>>>>>>>>> example?
> > > > >>>>>>>>>>>> It
> > > > >>>>>>>>>>>>>>> seems like there is a specific use case in mind and
> > maybe
> > > > >> we
> > > > >>>>>>>> can
> > > > >>
>

Reply via email to