Thanks Andrew. Done :)

@Chris: I changed the config parameter type from boolean to integer, which
defines the timeout for retrying. I thought reusing `max.block.ms` was not
reasonable as you mentioned.

So if the KIP looks good, let 's skip to the good part ;-) VOTING :)

Bests,
Alieh





On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Alieh,
> Just one final comment.
>
> [AJS5] Existing classes use Retriable, not Retryable. For example:
>
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html
>
> I suggest RetriableResponse and NonRetriableResponse.
>
> Thanks,
> Andrew
>
> > On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID>
> wrote:
> >
> > Hi all,
> >
> >
> > Thanks for all the valid points you listed.
> >
> >
> > KIP updates and addressing concerns:
> >
> >
> > 1) The KIP now suggests two Response types: `RetryableResponse` and
> > `NonRetryableResponse`
> >
> >
> > 2) `custom.exception.handler` is changed to
> `custom.exception.handler.class`
> >
> >
> > 3) The KIP clarifies that `In the case of an implemented handler for the
> > specified exception, the handler takes precedence.`
> >
> >
> > 4)  There is now a `default` implementation for both handle() methods.
> >
> >
> > 5)  @Chris: for `UnknownTopicOrPartition`, the default is already
> retrying
> > for 60s. (In fact, the default value of `max.block.ms`). If the handler
> > instructs to FAIL or SWALLOW, there will be no retry, and if the handler
> > instructs to RETRY, that will be the default behavior, which follows the
> > values in already existing config parameters such as `max.block.ms`.
> Does
> > that make sense?
> >
> >
> > Hope the changes and explanations are convincing :)
> >
> >
> > Cheers,
> >
> > Alieh
> >
> > On Mon, May 13, 2024 at 6:40 PM Justine Olshan
> <jols...@confluent.io.invalid>
> > wrote:
> >
> >> Oh I see. The type isn't the error type but a newly defined type for the
> >> response. Makes sense and works for me.
> >>
> >> Justine
> >>
> >> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <fearthecel...@gmail.com>
> >> wrote:
> >>
> >>> If we have dedicated methods for each kind of exception
> >>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't
> that
> >>> provide sufficient constraint? I'm not suggesting we eliminate these
> >>> methods, just that we change their return types to something more
> >> flexible.
> >>>
> >>> On Mon, May 13, 2024, 12:07 Justine Olshan
> <jols...@confluent.io.invalid
> >>>
> >>> wrote:
> >>>
> >>>> I'm not sure I agree with the Retriable and NonRetriableResponse
> >> comment.
> >>>> This doesn't limit the blast radius or enforce certain errors are
> used.
> >>>> I think we might disagree on how controlled these interfaces can be...
> >>>>
> >>>> Justine
> >>>>
> >>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hi Alieh,
> >>>>>
> >>>>> Thanks for the updates! I just have a few more thoughts:
> >>>>>
> >>>>> - I don't think a boolean property is sufficient to dictate retries
> >> for
> >>>>> unknown topic partitions, though. These errors can occur if a topic
> >> has
> >>>>> just been created, which can occur if, for example, automatic topic
> >>>>> creation is enabled for a multi-task connector. This is why I
> >> proposed
> >>> a
> >>>>> timeout instead of a boolean (and see my previous email for why
> >>> reducing
> >>>>> max.block.ms for a producer is not a viable alternative). If it
> >> helps,
> >>>> one
> >>>>> way to reproduce this yourself is to add the line
> >>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here:
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> >>>>> and then check the logs afterward for messages like "Error while
> >>> fetching
> >>>>> metadata with correlation id <n> :
> >>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
> >>>>>
> >>>>> - I also don't think we need custom XxxResponse enums for every
> >>> possible
> >>>>> method; it seems like this will lead to a lot of duplication and
> >>>> cognitive
> >>>>> overhead if we want to expand the error handler in the future.
> >>> Something
> >>>>> more flexible like RetriableResponse and NonRetriableResponse could
> >>>>> suffice.
> >>>>>
> >>>>> - Finally, the KIP still doesn't state how the handler will or won't
> >>> take
> >>>>> precedence over existing retry properties. If I set `retries` or `
> >>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that
> >> cause
> >>>>> retries to cease even if my custom handler would otherwise keep
> >>> returning
> >>>>> RETRY for an error?
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Chris
> >>>>>
> >>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> >>>>> andrew_schofi...@live.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Alieh,
> >>>>>> Just a few more comments on the KIP. It is looking much less risky
> >>> now
> >>>>> the
> >>>>>> scope
> >>>>>> is tighter.
> >>>>>>
> >>>>>> [AJS1] It would be nice to have default implementations of the
> >> handle
> >>>>>> methods
> >>>>>> so an implementor would not need to implement both themselves.
> >>>>>>
> >>>>>> [AJS2] Producer configurations which are class names usually end in
> >>>>>> “.class”.
> >>>>>> I suggest “custom.exception.handler.class”.
> >>>>>>
> >>>>>> [AJS3] If I implemented a handler, and I set a non-default value
> >> for
> >>>> one
> >>>>>> of the
> >>>>>> new configuations, what happens? I would expect that the handler
> >>> takes
> >>>>>> precedence. I wasn’t quite clear what “the control will follow the
> >>>>> handler
> >>>>>> instructions” meant.
> >>>>>>
> >>>>>> [AJS4] Because you now have an enum for the
> >>>>>> RecordTooLargeExceptionResponse,
> >>>>>> I don’t think you need to state in the comment for
> >>>>>> ProducerExceptionHandler that
> >>>>>> RETRY will be interpreted as FAIL.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi
> >>> <asae...@confluent.io.INVALID
> >>>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks for the very interesting discussion during my PTO.
> >>>>>>>
> >>>>>>>
> >>>>>>> KIP updates and addressing concerns:
> >>>>>>>
> >>>>>>>
> >>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler
> >> for
> >>>> the
> >>>>>> two
> >>>>>>> exceptions with different input parameters so that we have
> >>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and
> >>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord record)
> >>>>>>>
> >>>>>>>
> >>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well.
> >>>>>>>
> >>>>>>>
> >>>>>>> 3) The KIP suggests having two more configuration parameters with
> >>>>> boolean
> >>>>>>> values:
> >>>>>>>
> >>>>>>> - `drop.invalid.large.records` with a default value of `false`
> >> for
> >>>>>>> swallowing too large records.
> >>>>>>>
> >>>>>>> - `retry.unknown.topic.partition` with a default value of `true`
> >>> that
> >>>>>>> performs RETRY for `max.block.ms` ms, encountering the
> >>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>
> >>>>>>>
> >>>>>>> Hope the main concerns are addressed so that we can go forward
> >> with
> >>>>>> voting.
> >>>>>>>
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Alieh
> >>>>>>>
> >>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> >>>>>>> <alivsh...@confluent.io.invalid> wrote:
> >>>>>>>
> >>>>>>>> Hi Mathias,
> >>>>>>>>
> >>>>>>>>> [AL1] While I see the point, I would think having a different
> >>>>> callback
> >>>>>>>> for every exception might not really be elegant?
> >>>>>>>>
> >>>>>>>> I'm not sure how to assess the level of elegance of the
> >> proposal,
> >>>> but
> >>>>> I
> >>>>>> can
> >>>>>>>> comment on the technical characteristics:
> >>>>>>>>
> >>>>>>>> 1. Having specific interfaces that codify the logic that is
> >>>> currently
> >>>>>>>> prescribed in the comments reduce the chance of making a
> >> mistake.
> >>>>>>>> Commments may get ignored, misuderstood or etc. but if the
> >>> contract
> >>>> is
> >>>>>>>> codified, the compilier will help to enforce the contract.
> >>>>>>>> 2. Given that the logic is trickier than it seems (the
> >>>>> record-too-large
> >>>>>> is
> >>>>>>>> an example that can easily confuse someone who's not intimately
> >>>>> familiar
> >>>>>>>> with the nuances of the batching logic), having a little more
> >>> hoops
> >>>> to
> >>>>>> jump
> >>>>>>>> would give a greater chance that whoever tries to add a new
> >> cases
> >>>>> pauses
> >>>>>>>> and thinks a bit more.
> >>>>>>>> 3. As Justine pointed out, having different method will be a
> >>> forcing
> >>>>>>>> function to go through a KIP rather than smuggle new cases
> >> through
> >>>>>>>> implementation.
> >>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things
> >>>> reduce
> >>>>>> the
> >>>>>>>> chance of someone writing the code that works with 2 errors and
> >>> then
> >>>>>> when
> >>>>>>>> more errors are added in the future will suddenly incorrectly
> >>> ignore
> >>>>> new
> >>>>>>>> errors (the example I gave in the previous email).
> >>>>>>>>
> >>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend
> >>> as
> >>>>>>>> business logic. If a user puts a bad filter condition in their
> >> KS
> >>>> app,
> >>>>>> and
> >>>>>>>> drops messages
> >>>>>>>>
> >>>>>>>> I agree that there is always a chance to get a bug and lose
> >>>> messages,
> >>>>>> but
> >>>>>>>> there are generally separation of concerns that has different
> >> risk
> >>>>>> profile:
> >>>>>>>> the filtering logic may be more rigorously tested and rarely
> >>> changed
> >>>>>> (say
> >>>>>>>> an application developer does it), but setting the topics to
> >>> produce
> >>>>>> may be
> >>>>>>>> done via configuration (e.g. a user of the application does it)
> >>> and
> >>>>> it's
> >>>>>>>> generally an expectation that users would get an error when
> >>>>>> configuration
> >>>>>>>> is incorrect.
> >>>>>>>>
> >>>>>>>> What could be worse is that UnknownTopicOrPartitionException can
> >>> be
> >>>> an
> >>>>>>>> intermittent error, i.e. with a generally correct configuration,
> >>>> there
> >>>>>>>> could be metadata propagation problem on the cluster and then a
> >>>> random
> >>>>>> set
> >>>>>>>> of records could get lost.
> >>>>>>>>
> >>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> >>>> checking
> >>>>>> the
> >>>>>>>> size of the record upfront is exactly what the KIP proposes? No?
> >>>>>>>>
> >>>>>>>> It achieves the same result but solves it differently, my
> >>> proposal:
> >>>>>>>>
> >>>>>>>> 1. Application checks the validity of a record (maybe via a new
> >>>>>>>> validateRecord method) before producing it, and can just exclude
> >>> it
> >>>> or
> >>>>>>>> return an error to the user.
> >>>>>>>> 2. Application produces the record -- at this point there are no
> >>>>> records
> >>>>>>>> that could return record too large, they were either skipped at
> >>>> step 1
> >>>>>> or
> >>>>>>>> we didn't get here because step 1 failed.
> >>>>>>>>
> >>>>>>>> Vs. KIP's proposal
> >>>>>>>>
> >>>>>>>> 1. Application produces the record.
> >>>>>>>> 2. Application gets a callback.
> >>>>>>>> 3. Application returns the action on how to proceed.
> >>>>>>>>
> >>>>>>>> The advantage of the former is the clarity of semantics -- the
> >>>> record
> >>>>> is
> >>>>>>>> invalid (property of the record, not a function of server state
> >> or
> >>>>>> server
> >>>>>>>> configuration) and we can clearly know that it is the record
> >> that
> >>> is
> >>>>> bad
> >>>>>>>> and can never succeed.
> >>>>>>>>
> >>>>>>>> The KIP-proposed way actually has a very tricky point: it
> >> actually
> >>>>>> handles
> >>>>>>>> a subset of record-too-large exceptions.  The broker can return
> >>>>>>>> record-too-large and reject the whole batch (but we don't want
> >> to
> >>>>> ignore
> >>>>>>>> those because then we can skip random records that just happened
> >>> to
> >>>> be
> >>>>>> in
> >>>>>>>> the same batch), in some sense we use the same error for 2
> >>> different
> >>>>>>>> conditions and understanding that requires pretty deep
> >>> understanding
> >>>>> of
> >>>>>>>> Kafka internals.
> >>>>>>>>
> >>>>>>>> -Artem
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> >>>>>> <jols...@confluent.io.invalid
> >>>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> My concern with respect to it being fragile: the code that
> >>> ensures
> >>>>> the
> >>>>>>>>> error type is internal to the producer. Someone may see it and
> >>>> say, I
> >>>>>>>> want
> >>>>>>>>> to add such and such error. This looks like internal code, so I
> >>>> don't
> >>>>>>>> need
> >>>>>>>>> a KIP, and then they can change it to whatever they want
> >> thinking
> >>>> it
> >>>>> is
> >>>>>>>>> within the typical kafka improvement protocol.
> >>>>>>>>>
> >>>>>>>>> Relying on an internal change to enforce an external API is
> >>> fragile
> >>>>> in
> >>>>>> my
> >>>>>>>>> opinion. That's why I sort of agreed with Artem with enforcing
> >>> the
> >>>>>> error
> >>>>>>>> in
> >>>>>>>>> the method signature -- part of the public API.
> >>>>>>>>>
> >>>>>>>>> Chris's comments on requiring more information to handler again
> >>>> makes
> >>>>>> me
> >>>>>>>>> wonder if we are solving a problem of lack of information at
> >> the
> >>>>>>>>> application level with a more powerful solution than we need.
> >>> (Ie,
> >>>> if
> >>>>>> we
> >>>>>>>>> had more information, could the application close and restart
> >> the
> >>>>>>>>> transaction rather than having to drop records) But I am happy
> >> to
> >>>>>>>>> compromise with a handler that we can agree is sufficiently
> >>>>> controlled
> >>>>>>>> and
> >>>>>>>>> documented.
> >>>>>>>>>
> >>>>>>>>> Justine
> >>>>>>>>>
> >>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
> >>>> <chr...@aiven.io.invalid
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Alieh,
> >>>>>>>>>>
> >>>>>>>>>> Continuing prior discussions:
> >>>>>>>>>>
> >>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching
> >> point
> >>> is
> >>>>>>>> that I
> >>>>>>>>>> don't see the point in allowing for this kind of pluggable
> >> logic
> >>>>>>>> without
> >>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if
> >>> we're
> >>>>>> going
> >>>>>>>>> to
> >>>>>>>>>> implement retries only on "important" topics when a topic
> >>>> partition
> >>>>>>>> isn't
> >>>>>>>>>> found, why wouldn't we also want to be able to do this for
> >> other
> >>>>>>>> errors?
> >>>>>>>>>> Again, taking authorization errors as an example, why wouldn't
> >>> we
> >>>>> want
> >>>>>>>> to
> >>>>>>>>>> be able to fail when we can't write to "important" topics
> >>> because
> >>>>> the
> >>>>>>>>>> producer principal lacks sufficient ACLs, and drop the record
> >> if
> >>>> the
> >>>>>>>>> topic
> >>>>>>>>>> isn't "important"? In a security-conscious environment with
> >>>>>>>>>> runtime-dependent topic routing (which is a common feature of
> >>> many
> >>>>>>>> source
> >>>>>>>>>> connectors, such as the Debezium connectors), this seems
> >> fairly
> >>>>>> likely.
> >>>>>>>>>>
> >>>>>>>>>> 2) As far as changing the shape of the API goes, I like
> >> Artem's
> >>>> idea
> >>>>>> of
> >>>>>>>>>> splitting out the interface based on specific exceptions. This
> >>> may
> >>>>> be
> >>>>>> a
> >>>>>>>>>> little laborious to expand in the future, but if we really
> >> want
> >>> to
> >>>>>>>>>> limit the exceptions that we cover with the handler and move
> >>>> slowly
> >>>>>> and
> >>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in the
> >>>>>>>>> interface. I
> >>>>>>>>>> also acknowledge that there's no way to completely prevent
> >>> people
> >>>>> from
> >>>>>>>>>> shooting themselves in the foot by implementing the API
> >>>> incorrectly,
> >>>>>>>> but
> >>>>>>>>> I
> >>>>>>>>>> think it's worth it to do what we can--including leveraging
> >> the
> >>>> Java
> >>>>>>>>>> language's type system--to help them, so IMO there's value to
> >>>>>>>> eliminating
> >>>>>>>>>> the implicit behavior of failing when a policy returns RETRY
> >>> for a
> >>>>>>>>>> non-retriable error. This can take a variety of shapes and I'm
> >>> not
> >>>>>>>> going
> >>>>>>>>> to
> >>>>>>>>>> insist on anything specific, but I do want to again raise my
> >>>>> concerns
> >>>>>>>>> with
> >>>>>>>>>> the current proposal and request that we find something a
> >> little
> >>>>>>>> better.
> >>>>>>>>>>
> >>>>>>>>>> 3) Concerning the default implementation--actually, I meant
> >>> what I
> >>>>>>>> wrote
> >>>>>>>>> :)
> >>>>>>>>>> I don't want a "second" default, I want an implementation of
> >>> this
> >>>>>>>>> interface
> >>>>>>>>>> to be used as the default if no others are specified. The
> >>> behavior
> >>>>> of
> >>>>>>>>> this
> >>>>>>>>>> default implementation would be identical to existing behavior
> >>> (so
> >>>>>>>> there
> >>>>>>>>>> would be no backwards compatibility concerns like the ones
> >>> raised
> >>>> by
> >>>>>>>>>> Matthias), but it would be possible to configure this default
> >>>>> handler
> >>>>>>>>> class
> >>>>>>>>>> to behave differently for a basic set of scenarios. This would
> >>>>> mirror
> >>>>>>>>> (pun
> >>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and its
> >>>>>>>>>> ReplicationPolicy interface [1]. There is a default
> >>> implementation
> >>>>>>>>>> available [2] that recognizes a handful of basic configuration
> >>>>>>>> properties
> >>>>>>>>>> [3] for simple tweaks, but if users want, they can also
> >>> implement
> >>>>>> their
> >>>>>>>>> own
> >>>>>>>>>> replication policy for more fine-grained logic if those
> >>> properties
> >>>>>>>> aren't
> >>>>>>>>>> flexible enough.
> >>>>>>>>>>
> >>>>>>>>>> More concretely, I'm imagining something like this for the
> >>>> producer
> >>>>>>>>>> exception handler:
> >>>>>>>>>>
> >>>>>>>>>> - Default implementation class
> >>>>>>>>>> of
> >>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> >>>>>>>>>> - This class would recognize two properties:
> >>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to
> >>>> false.
> >>>>> If
> >>>>>>>>>> "false", then causes the handler to return FAIL whenever
> >>>>>>>>>> a RecordTooLargeException is encountered; if "true", then
> >> causes
> >>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead
> >>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer
> >> property,
> >>>>>>>> defaults
> >>>>>>>>>> to
> >>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> >>>>> encountered,
> >>>>>>>>>> causes the handler to return FAIL if that record has been
> >>> pending
> >>>>> for
> >>>>>>>>> more
> >>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be returned
> >>>>>>>>>>
> >>>>>>>>>> I think this is worth addressing now instead of later because
> >> it
> >>>>>> forces
> >>>>>>>>> us
> >>>>>>>>>> to evaluate the usefulness of this interface and it addresses
> >> a
> >>>>>>>>>> long-standing issue not just with Kafka Connect, but with the
> >>> Java
> >>>>>>>>> producer
> >>>>>>>>>> in general. For reference, here are a few tickets I collected
> >>>> after
> >>>>>>>>> briefly
> >>>>>>>>>> skimming our Jira showing that this is a real pain point for
> >>>> users:
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although
> >>> this
> >>>> is
> >>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone
> >> who
> >>>>>>>>> configures
> >>>>>>>>>> a producer to use a high retry timeout. I am aware of the
> >>>>>> max.block.ms
> >>>>>>>>>> property, but it's painful and IMO poor behavior to require
> >>> users
> >>>> to
> >>>>>>>>> reduce
> >>>>>>>>>> the value of this property just to handle the single scenario
> >>> when
> >>>>>>>> trying
> >>>>>>>>>> to write to topics that don't exist, since it would also limit
> >>> the
> >>>>>>>> retry
> >>>>>>>>>> timeout for other operations that are legitimately retriable.
> >>>>>>>>>>
> >>>>>>>>>> Raising new points:
> >>>>>>>>>>
> >>>>>>>>>> 5) I don't see the interplay between this handler and existing
> >>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm
> >>>> assuming
> >>>>>>>> that
> >>>>>>>>>> properties like "retries", "max.block.ms", and "
> >>>> delivery.timeout.ms
> >>>>> "
> >>>>>>>>> would
> >>>>>>>>>> take precedence over the handler and once they are exhausted,
> >>> the
> >>>>>>>>>> record/batch will fail no matter what? If so, it's probably
> >>> worth
> >>>>>>>> briefly
> >>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP,
> >> and
> >>>> if
> >>>>>>>> not,
> >>>>>>>>>> I'm curious what you have in mind.
> >>>>>>>>>>
> >>>>>>>>>> 6) I also wonder if the API provides enough information in its
> >>>>> current
> >>>>>>>>>> form. Would it be possible to provide handlers with some way
> >> of
> >>>>>>>> tracking
> >>>>>>>>>> how long a record has been pending for (i.e., how long it's
> >> been
> >>>>> since
> >>>>>>>>> the
> >>>>>>>>>> record was provided as an argument to Producer::send)? One way
> >>> to
> >>>> do
> >>>>>>>> this
> >>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)`
> >> and
> >>>>>>>>>> allow/require the handler to do its own bookkeeping, probably
> >>>> with a
> >>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that the
> >>>>> handler
> >>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to
> >>> track
> >>>>>>>>> records.
> >>>>>>>>>> An alternative could be to include information about the
> >> initial
> >>>>> time
> >>>>>>>> the
> >>>>>>>>>> record was received by the producer and the number of retries
> >>> that
> >>>>>> have
> >>>>>>>>>> been performed for it as parameters in the handle method(s),
> >> but
> >>>> I'm
> >>>>>>>> not
> >>>>>>>>>> sure how easy this would be to implement and it might clutter
> >>>> things
> >>>>>>>> up a
> >>>>>>>>>> bit too much.
> >>>>>>>>>>
> >>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer,
> >>>>>>>>> AutoCloseable)
> >>>>>>>>>> as a superinterface for the handler interface?
> >>>>>>>>>>
> >>>>>>>>>> [1] -
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> >>>>>>>>>> [2] -
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> >>>>>>>>>> [3] -
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> >>>>>>>>>> ,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>>
> >>>>>>>>>> Chris
> >>>>>>>>>>
> >>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
> >>> mj...@apache.org
> >>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Very interesting discussion.
> >>>>>>>>>>>
> >>>>>>>>>>> Seems a central point is the question about "how generic" we
> >>>>> approach
> >>>>>>>>>>> this, and some people think we need to be conservative and
> >>> others
> >>>>>>>> think
> >>>>>>>>>>> we should try to be as generic as possible.
> >>>>>>>>>>>
> >>>>>>>>>>> Personally, I think following a limited scope for this KIP
> >> (by
> >>>>>>>>>>> explicitly saying we only cover RecordTooLarge and
> >>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete
> >>>> tickets
> >>>>>>>> that
> >>>>>>>>>>> we address, while for other exception (like authorization) we
> >>>> don't
> >>>>>>>>> know
> >>>>>>>>>>> if people want to handle it to begin with. Boiling the ocean
> >>>> might
> >>>>>>>> not
> >>>>>>>>>>> get us too far, and being somewhat pragmatic might help to
> >> move
> >>>>> this
> >>>>>>>>> KIP
> >>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we want
> >> to
> >>>> be
> >>>>>>>>>>> careful anyway to not allow users to break stuff too easily.
> >>>>>>>>>>>
> >>>>>>>>>>> As the same time, I agree that we should setup this change
> >> in a
> >>>>>>>> forward
> >>>>>>>>>>> looking way, and thus having a single generic handler allows
> >> us
> >>>> to
> >>>>>>>>> later
> >>>>>>>>>>> extend the handler more easily. This should also simplify
> >>> follow
> >>>> up
> >>>>>>>> KIP
> >>>>>>>>>>> that might add new error cases (I actually mentioned one more
> >>> to
> >>>>>>>> Alieh
> >>>>>>>>>>> already, but we both agreed that it might be best to exclude
> >> it
> >>>>> from
> >>>>>>>>> the
> >>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up
> >> KIP
> >>> is
> >>>>> not
> >>>>>>>>>>> the end of the world.)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> @Chris:
> >>>>>>>>>>>
> >>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it
> >> actually
> >>>>> would
> >>>>>>>>> be?
> >>>>>>>>>>> If the contract is clearly defined, it seems to be fine what
> >>> the
> >>>>> KIP
> >>>>>>>>>>> proposes, and given that such a handler is an expert API, and
> >>> we
> >>>>> can
> >>>>>>>>>>> provide "best practices" (cf my other comment below in
> >> [AL1]),
> >>>>> being
> >>>>>>>> a
> >>>>>>>>>>> little bit pragmatic sound fine to me.
> >>>>>>>>>>>
> >>>>>>>>>>> Not sure if I understand Justin's argument on this question?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> (3) About having a default impl or not. I am fine with adding
> >>>> one,
> >>>>>>>> even
> >>>>>>>>>>> if I am not sure why Connect could not just add its own one
> >> and
> >>>>> plug
> >>>>>>>> it
> >>>>>>>>>>> in (and we would add corresponding configs for Connect, but
> >> not
> >>>> for
> >>>>>>>> the
> >>>>>>>>>>> producer itself)? For this case, we could also do this as a
> >>>> follow
> >>>>> up
> >>>>>>>>>>> KIP, but happy to include it in this KIP to provide value to
> >>>>> Connect
> >>>>>>>>>>> right away (even if the value might not come right away if we
> >>>> miss
> >>>>>>>> the
> >>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we
> >> would
> >>>> for
> >>>>>>>>> sure
> >>>>>>>>>>> plugin our own impl, and lock down the config such that users
> >>>>> cannot
> >>>>>>>>> set
> >>>>>>>>>>> their own handler on the internal producer to begin with.
> >> Might
> >>>> be
> >>>>>>>> good
> >>>>>>>>>>> to elaborate why the producer should have a default? We might
> >>>>>>>> actually
> >>>>>>>>>>> want to add this to the KIP right away?
> >>>>>>>>>>>
> >>>>>>>>>>> The key for a default impl would be, to not change the
> >> current
> >>>>>>>>> behavior,
> >>>>>>>>>>> and having no default seems to achieve this. For the two
> >> cases
> >>>> you
> >>>>>>>>>>> mentioned, it's unclear to me what default value on "upper
> >>> bound
> >>>> on
> >>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set?
> >>> Seems
> >>>>> it
> >>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? However,
> >>> if
> >>>>>>>> users
> >>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would
> >> need
> >>> to
> >>>>> set
> >>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we
> >>>> hard-code 2
> >>>>>>>>>>> minutes, it might not be backward compatible. I have the
> >>>> impression
> >>>>>>>> we
> >>>>>>>>>>> might introduce some undesired coupling? -- For the "record
> >> too
> >>>>>>>> large"
> >>>>>>>>>>> case, the config seems to be boolean and setting it to
> >> `false`
> >>> by
> >>>>>>>>>>> default seems to provide backward compatibility.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> @Artem:
> >>>>>>>>>>>
> >>>>>>>>>>> [AL1] While I see the point, I would think having a different
> >>>>>>>> callback
> >>>>>>>>>>> for every exception might not really be elegant? In the end,
> >>> the
> >>>>>>>>> handler
> >>>>>>>>>>> is an very advanced feature anyway, and if it's implemented
> >> in
> >>> a
> >>>>> bad
> >>>>>>>>>>> way, well, it's a user error -- we cannot protect users from
> >>>>>>>>> everything.
> >>>>>>>>>>> To me, a handler like this, is to some extend "business
> >> logic"
> >>>> and
> >>>>>>>> if a
> >>>>>>>>>>> user gets business logic wrong, it's hard to protect them. --
> >>> We
> >>>>>>>> would
> >>>>>>>>>>> of course provide best practice guidance in the JaveDocs, and
> >>>>> explain
> >>>>>>>>>>> that a handler should have explicit `if` statements for stuff
> >>> it
> >>>>> want
> >>>>>>>>> to
> >>>>>>>>>>> handle, and only a single default which return FAIL.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application
> >> layer.
> >>>> Ie,
> >>>>>>>> the
> >>>>>>>>>>> TX is not completed, we clean up and setup out task from
> >>> scratch,
> >>>>> to
> >>>>>>>>>>> ensure the pending transaction is completed before we resume.
> >>> If
> >>>>> the
> >>>>>>>> TX
> >>>>>>>>>>> was indeed aborted, we would retry from older offset and thus
> >>>> just
> >>>>>>>> hit
> >>>>>>>>>>> the same error again and the loop begins again.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> >> extend
> >>>> as
> >>>>>>>>>>> business logic. If a user puts a bad filter condition in
> >> their
> >>> KS
> >>>>>>>> app,
> >>>>>>>>>>> and drops messages, it nothing we can do about it, and this
> >>>> handler
> >>>>>>>>>>> IMHO, has a similar purpose. This is also the line of
> >> thinking
> >>> I
> >>>>>>>> apply
> >>>>>>>>>>> to EOS, to address Justin's concern about "should we allow to
> >>>> drop
> >>>>>>>> for
> >>>>>>>>>>> EOS", and my answer is "yes", because it's more business
> >> logic
> >>>> than
> >>>>>>>>>>> actual error handling IMHO. And by default, we fail... So
> >> users
> >>>>>>>> opt-in
> >>>>>>>>>>> to add business logic to drop records. It's an application
> >>> level
> >>>>>>>>>>> decision how to write the code.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> >>>>> checking
> >>>>>>>>> the
> >>>>>>>>>>> size of the record upfront is exactly what the KIP proposes?
> >>> No?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> @Justin:
> >>>>>>>>>>>
> >>>>>>>>>>>> I saw the sample
> >>>>>>>>>>>> code -- is it just an if statement checking for the error
> >>> before
> >>>>>>>> the
> >>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> >>>>>>>>>>>
> >>>>>>>>>>> What do you mean by fragile? Not sure if I see your point.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> -Matthias
> >>>>>>>>>>>
> >>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> >>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP.  The motivation talks about very
> >> specific
> >>>>>>>> cases,
> >>>>>>>>>> but
> >>>>>>>>>>>> the interface is generic.
> >>>>>>>>>>>>
> >>>>>>>>>>>> [AL1]
> >>>>>>>>>>>> If the interface evolves in the future I think we could have
> >>> the
> >>>>>>>>>>> following
> >>>>>>>>>>>> confusion:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. A user implemented SWALLOW action for both
> >>>>>>>> RecordTooLargeException
> >>>>>>>>>> and
> >>>>>>>>>>>> UnknownTopicOrPartitionException.  For simpicity they just
> >>>> return
> >>>>>>>>>> SWALLOW
> >>>>>>>>>>>> from the function, because it elegantly handles all known
> >>> cases.
> >>>>>>>>>>>> 2. The interface has evolved to support a new exception.
> >>>>>>>>>>>> 3. The user has upgraded their Kafka client.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Now a new kind of error gets dropped on the floor without
> >>> user's
> >>>>>>>>>>> intention
> >>>>>>>>>>>> and it would be super hard to detect and debug.
> >>>>>>>>>>>>
> >>>>>>>>>>>> To avoid the confusion, I think we should use handlers for
> >>>>> specific
> >>>>>>>>>>>> exceptions.  Then if a new exception is added it won't get
> >>>>> silently
> >>>>>>>>>>> swalled
> >>>>>>>>>>>> because the user would need to add new functionality to
> >> handle
> >>>> it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I also have some higher level comments:
> >>>>>>>>>>>>
> >>>>>>>>>>>> [AL2]
> >>>>>>>>>>>>> it throws a TimeoutException, and the user can only blindly
> >>>>> retry,
> >>>>>>>>>> which
> >>>>>>>>>>>> may result in an infinite retry loop
> >>>>>>>>>>>>
> >>>>>>>>>>>> If the TimeoutException happens during transactional
> >>> processing
> >>>>>>>>>> (exactly
> >>>>>>>>>>>> once is the desired sematnics), then the client should not
> >>> retry
> >>>>>>>> when
> >>>>>>>>>> it
> >>>>>>>>>>>> gets TimeoutException because without knowing the reason for
> >>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the
> >> message
> >>>> got
> >>>>>>>>>>> actually
> >>>>>>>>>>>> produced or not and retrying the message may result in
> >>>>> duplicatees.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the
> >>>>>>>> underlying
> >>>>>>>>>> root
> >>>>>>>>>>>> cause of missing metadata
> >>>>>>>>>>>>
> >>>>>>>>>>>> Maybe we should fix the error handling and return the proper
> >>>>>>>>> underlying
> >>>>>>>>>>>> message?  Then the application can properly handle the
> >> message
> >>>>>>>> based
> >>>>>>>>> on
> >>>>>>>>>>>> preferences.
> >>>>>>>>>>>>
> >>>>>>>>>>>> From the product perspective, it's not clear how safe it is
> >> to
> >>>>>>>>> blindly
> >>>>>>>>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
> >>>>>>>>> situations
> >>>>>>>>>>>> when a simple typo could lead to massive data loss (part of
> >>> the
> >>>>>>>> data
> >>>>>>>>>>> would
> >>>>>>>>>>>> effectively be produced to a "black hole" and the user may
> >> not
> >>>>>>>> notice
> >>>>>>>>>> it
> >>>>>>>>>>>> for a while).
> >>>>>>>>>>>>
> >>>>>>>>>>>> In which situations would you recommend the user to "black
> >>> hole"
> >>>>>>>>>> messages
> >>>>>>>>>>>> in case of misconfiguration?
> >>>>>>>>>>>>
> >>>>>>>>>>>> [AL3]
> >>>>>>>>>>>>
> >>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> >>>>>>>>> RecordTooLargeException,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Is it my understanding that this KIP proposes that
> >>> functionality
> >>>>>>>> that
> >>>>>>>>>>> would
> >>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that happen
> >>>>> because
> >>>>>>>>> the
> >>>>>>>>>>>> producer cannot produce the record (if the broker rejects
> >> the
> >>>>>>>> batch,
> >>>>>>>>>> the
> >>>>>>>>>>>> error won't get to the handler, because we cannot know which
> >>>> other
> >>>>>>>>>>> records
> >>>>>>>>>>>> get ignored).  In this case, why not just check the locally
> >>>>>>>>> configured
> >>>>>>>>>>> max
> >>>>>>>>>>>> record size upfront and not produce the recrord in the first
> >>>>> place?
> >>>>>>>>>>> Maybe
> >>>>>>>>>>>> we can expose a validation function from the producer that
> >>> could
> >>>>>>>>>> validate
> >>>>>>>>>>>> the records locally, so we don't need to produce the record
> >> in
> >>>>>>>> order
> >>>>>>>>> to
> >>>>>>>>>>>> know that it's invalid.
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Artem
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> >>>>>>>>>>> <jols...@confluent.io.invalid>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Alieh and Chris,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess
> >> I
> >>>> just
> >>>>>>>>>> didn't
> >>>>>>>>>>>>> understand how that would be ensured on the producer side.
> >> I
> >>>> saw
> >>>>>>>> the
> >>>>>>>>>>> sample
> >>>>>>>>>>>>> code -- is it just an if statement checking for the error
> >>>> before
> >>>>>>>> the
> >>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Can you clarify what you mean by `since the code does not
> >>> reach
> >>>>>>>> the
> >>>>>>>>> KS
> >>>>>>>>>>>>> interface and breaks somewhere in producer.` If we surfaced
> >>>> this
> >>>>>>>>> error
> >>>>>>>>>>> to
> >>>>>>>>>>>>> the application in a better way would that also be a
> >> solution
> >>>> to
> >>>>>>>> the
> >>>>>>>>>>> issue?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> >>>>>>>>>>> <asae...@confluent.io.invalid>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @Chris
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is
> >>> the
> >>>>> one
> >>>>>>>>> you
> >>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the
> >>>> future,
> >>>>>>>> but
> >>>>>>>>>> as
> >>>>>>>>>>>>>> Justine mentioned, we must be very conservative about
> >> adding
> >>>>> more
> >>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that
> >> the
> >>>> user
> >>>>>>>> is
> >>>>>>>>>>> able
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> develop their own code. As mentioned in the motivation
> >>> section
> >>>>>>>> and
> >>>>>>>>>> the
> >>>>>>>>>>>>>> examples, sometimes the user decides on dropping a record
> >>>> based
> >>>>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>>> topic, for example.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2) Defining two separate methods for retriable and
> >>>> non-retriable
> >>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may
> >>> still
> >>>>>>>>> make a
> >>>>>>>>>>>>>> mistake by implementing the wrong method and see a
> >>>> non-expecting
> >>>>>>>>>>>>> behaviour.
> >>>>>>>>>>>>>> For example, he may implement handleRetriable() for
> >>>>>>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice, he
> >>> sees
> >>>>> no
> >>>>>>>>>>> change
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> default behaviour since he implemented the wrong method. I
> >>>> think
> >>>>>>>> we
> >>>>>>>>>> can
> >>>>>>>>>>>>>> never reduce the user’s mistakes to 0.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3) Default implementation for Handler: the default
> >> behaviour
> >>>> is
> >>>>>>>>>> already
> >>>>>>>>>>>>>> preserved with NO need of implementing any handler or
> >>> setting
> >>>>> the
> >>>>>>>>>>>>>> corresponding config parameter `custom.exception.handler`.
> >>>> What
> >>>>>>>> you
> >>>>>>>>>>> mean
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>> actually having a second default, which requires having
> >> both
> >>>>>>>>>> interface
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>> config parameters. About UnknownTopicOrPartitionException:
> >>> the
> >>>>>>>>>> producer
> >>>>>>>>>>>>>> already offers the config parameter `max.block.ms` which
> >>>>>>>>> determines
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> duration of retrying. The main purpose of the user who
> >> needs
> >>>> the
> >>>>>>>>>>> handler
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it in
> >>> the
> >>>>>>>> way
> >>>>>>>>> he
> >>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS
> >> users.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the
> >>>>> error,
> >>>>>>>>>> while
> >>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense for
> >>>> more
> >>>>>>>>> ppl,
> >>>>>>>>>> I
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>> change it to SKIP
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> @Justine
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) was addressed by Chris.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned.
> >>> Currently,
> >>>>>>>>> there
> >>>>>>>>>> is
> >>>>>>>>>>>>> no
> >>>>>>>>>>>>>> way to handle these issues application-side. Even KS users
> >>> who
> >>>>>>>>>>> implement
> >>>>>>>>>>>>> KS
> >>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the
> >>>> exceptions
> >>>>>>>> as
> >>>>>>>>>>> they
> >>>>>>>>>>>>>> intend since the code does not reach the KS interface and
> >>>> breaks
> >>>>>>>>>>>>> somewhere
> >>>>>>>>>>>>>> in producer.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> >>>>>>>>>> fearthecel...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Justine,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The method signatures for the interface are indeed
> >>>> open-ended,
> >>>>>>>> but
> >>>>>>>>>> the
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>> states that its uses will be limited. See the motivation
> >>>>>>>> section:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> We believe that the user should be able to develop
> >> custom
> >>>>>>>>> exception
> >>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other
> >>> hand,
> >>>>>>>> this
> >>>>>>>>>>> will
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>> an expert-level API, and using that may result in strange
> >>>>>>>>> behaviour
> >>>>>>>>>> in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> system, making it hard to find the root cause. Therefore,
> >>> the
> >>>>>>>>> custom
> >>>>>>>>>>>>>>> handler is currently limited to handling
> >>>>> RecordTooLargeException
> >>>>>>>>> and
> >>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> >>>>>>>>>>> <jols...@confluent.io.invalid
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :(
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to
> >> two
> >>>>>>>>> specific
> >>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic
> >>> Exception e
> >>>>>>>> and
> >>>>>>>>>> can
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any
> >>>> exception.
> >>>>>>>> I
> >>>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
> >>>>> concerned
> >>>>>>>>>> about
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and
> >> the
> >>>>>>>>> contract
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> with transactions. We are allowing the client to make
> >>>>> decisions
> >>>>>>>>>> that
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we
> >>> build
> >>>> in
> >>>>>>>>> log
> >>>>>>>>>>>>>>> messages
> >>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm
> >> really
> >>>>>>>>>> concerned
> >>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>> messages being silently dropped.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non
> >>> retriable
> >>>>>>>>> errors
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic
> >>> or
> >>>>>>>>>> partition
> >>>>>>>>>>>>>>>> exception too early, as there are cases where it can be
> >>>>>>>>> transient.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping records
> >>> as
> >>>>>>>> part
> >>>>>>>>> of
> >>>>>>>>>>>>> EOS
> >>>>>>>>>>>>>>>> generally as in many cases, these errors signify an
> >> issue
> >>>> with
> >>>>>>>>> the
> >>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror maker
> >>> may
> >>>>>>>> have
> >>>>>>>>>>>>>> reasons
> >>>>>>>>>>>>>>>> they want to progress past these messages, but wondering
> >>> if
> >>>>>>>> there
> >>>>>>>>>> is
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>> that can be done application-side. I'm willing to accept
> >>>> this
> >>>>>>>>> sort
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of thing
> >>> is
> >>>>>>>>>> happening
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> we limit the blast radius for what we can do.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> >>>>>>>>>> <chr...@aiven.io.invalid
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have
> >>> some
> >>>>>>>>>> thoughts
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> I'd like to see addressed before voting.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
> >>>>> interface,
> >>>>>>>>> why
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very
> >>>>> specific
> >>>>>>>>>>>>>>> scenarios?
> >>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be
> >>>> handled
> >>>>>>>> as
> >>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>> (allowing users to drop records destined for some
> >>>> off-limits
> >>>>>>>>>>>>> topics,
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay in
> >>> the
> >>>>>>>>>>>>>> propagation
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of
> >> other
> >>>>>>>> errors
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the
> >> follow-up
> >>>>> work
> >>>>>>>>> of
> >>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure
> >> that
> >>>>> we're
> >>>>>>>>> not
> >>>>>>>>>>>>>>>> painting
> >>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way
> >>> that
> >>>>>>>> would
> >>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>> future modifications difficult.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> >>>>>>>>> non-retriable
> >>>>>>>>>>>>>>> errors
> >>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two
> >>>>> separate
> >>>>>>>>>>>>>> methods
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> handle each case separately? That way there's no
> >>> ambiguity
> >>>> or
> >>>>>>>>>>>>>> implicit
> >>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a
> >>>>>>>>>>>>>> RecordTooLargeException.
> >>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse
> >>>>>>>>>>>>> handle(ProducerRecord,
> >>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse
> >>>>>>>>> handleRetriable(ProducerRecord,
> >>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can
> >>> obviously
> >>>>> be
> >>>>>>>>>>>>> toyed
> >>>>>>>>>>>>>>>> with a
> >>>>>>>>>>>>>>>>> bit.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface
> >> may
> >>>>>>>> benefit
> >>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams
> >>>>>>>>>> applications,
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>> comes at significant deployment cost for other
> >>> low-/no-code
> >>>>>>>>>>>>>>> environments,
> >>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and
> >>> MirrorMaker
> >>>> 2.
> >>>>>>>>> Can
> >>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> default implementation of the exception handler that
> >>> allows
> >>>>>>>> for
> >>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>> simple
> >>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property? Two
> >>>> things
> >>>>>>>>> that
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry
> >> time
> >>>> for
> >>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to
> >>> drop
> >>>>>>>>>> records
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large
> >> exception.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of
> >>> the
> >>>>>>>>>> proposed
> >>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
> >>>>>>>>> especially
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer in
> >>>> order
> >>>>>>>> to
> >>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and
> >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> >>>>>>>> sections]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the
> >>> producer
> >>>>>>>>>>>>> collects
> >>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException
> >>> related
> >>>>>>>> to a
> >>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom
> >>>> exception
> >>>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> this case may decide on dropping the record and
> >>> continuing
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka
> >> Streams, a
> >>>>>>>> record
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way to
> >>> skip
> >>>>>>>> over
> >>>>>>>>>>>>>> it. A
> >>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside
> >> the
> >>>>>>>>>>>>> producer,
> >>>>>>>>>>>>>>>> i.e.,
> >>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify
> >> the
> >>>>>>>> overall
> >>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section for
> >>> more
> >>>>>>>>>>>>>>> explanation.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
> >>>>> producer
> >>>>>>>>>>>>>> handles
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log about
> >>>>> missing
> >>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits "
> >>>>>>>>>>>>>>> deliver.timeout.ms"
> >>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only
> >> blindly
> >>>>>>>> retry,
> >>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown
> >>>>> TimeoutException
> >>>>>>>>>>>>>> "cuts"
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing
> >>>> metadata
> >>>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a
> >>>>>>>>> non-existing
> >>>>>>>>>>>>>>>> topic).
> >>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the
> >> infinite
> >>>>>>>> retry
> >>>>>>>>>>>>>> loop.
> >>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the
> >>>>>>>> application
> >>>>>>>>>>>>>> gets
> >>>>>>>>>>>>>>>>> stuck.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> - Having interface vs configuration option:
> >> [Motivation,
> >>>>>>>>>>>>> Examples,
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> Rejected Alternatives sections]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the
> >> full
> >>>>>>>>>>>>>> flexibility
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams
> >>> ones,
> >>>>>>>>>>>>>> determine
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For
> >>> example, f
> >>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may
> >>>> want
> >>>>>>>> to
> >>>>>>>>>>>>>> raise
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics.
> >>>> Having
> >>>>> a
> >>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not
> >> serve
> >>>> the
> >>>>>>>>>>>>> user's
> >>>>>>>>>>>>>>>>>> needs. See Example 2, please.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
> >>>>>>>> section]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> >>>>>>>>>>>>>> RecordTooLargeException,
> >>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>> this record will not be a part of the batch of
> >>>> transactions
> >>>>>>>> and
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode.
> >> So
> >>> no
> >>>>>>>>>>>>> worries
> >>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in
> >>> this
> >>>>>>>> case,
> >>>>>>>>>>>>> as
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW
> >>>> means
> >>>>>>>>> drop
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>> and continue/swallow the error.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for
> >>>>>>>>>>>>>>>>> RecordTooLargeException?
> >>>>>>>>>>>>>>>>>> [Proposed Changes section]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW
> >>> for
> >>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal
> >>> to
> >>>>>>>> FAIL.
> >>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> well documented/informed in javadoc.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws an
> >>>>>>>>> exception?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it
> >>> throws
> >>>>> an
> >>>>>>>>>>>>>>>> exception,
> >>>>>>>>>>>>>>>>>> everything fails.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This is a PoC PR <
> >>>>> https://github.com/apache/kafka/pull/15846
> >>>>>>>>>
> >>>>>>>>>>>>> ONLY
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to
> >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this
> >>> PR
> >>>>>>>>> LATER.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Looking forward to your feedback again.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> >>>>>>>> k...@kirktrue.pro>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Comments inline...
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> >>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by
> >>> broker,
> >>>>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
> >>>>>>>> interface
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> introduced
> >>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the
> >>> producer.
> >>>>>>>> This
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to
> >> manage
> >>>> the
> >>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the
> >>>> Producer.send()
> >>>>>>>>>>>>>> method.
> >>>>>>>>>>>>>>>>>> Please
> >>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> >>>>>>>>>>>>> investigated
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern
> >>>> about
> >>>>>>>> how
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> errors as well.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are
> >>>>> called
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is
> >> not
> >>>> the
> >>>>>>>>>>>>>> desired
> >>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions
> >>>> beforehand.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for
> >> when
> >>>>>>>>>>>>> Callback
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> >>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I
> >>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at
> >> runtime
> >>> is
> >>>>>>>>>>>>>>> possible.
> >>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>> so,
> >>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe
> >> in
> >>>> the
> >>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>> try,
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am not
> >>>> 100%
> >>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can
> >> consider
> >>>> the
> >>>>>>>>>>>>> RETRY
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> FAIL
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to
> >> consider
> >>> a
> >>>>>>>>>>>>>> constant
> >>>>>>>>>>>>>>>>> number
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other
> >>> exceptions
> >>>>> as
> >>>>>>>>>>>>>> well.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It’s not presently possible to change the
> >> configuration
> >>>> of
> >>>>>>>> an
> >>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a
> >>>>>>>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>>>>>> once,
> >>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will
> >>>> change
> >>>>>>>>> that
> >>>>>>>>>>>>>>> fact.
> >>>>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a response
> >> of
> >>>>>>>> RETRY
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> “oversized” record.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an
> >>>> exception?
> >>>>> I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be
> >>>>> exactly
> >>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually
> >> did
> >>>> not
> >>>>>>>>>>>>> have
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>> handler.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is
> >>> the
> >>>>>>>> right
> >>>>>>>>>>>>>>>> choice.
> >>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have
> >>>>> repercussions,
> >>>>>>>>>>>>>>>> depending
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>> the business logic. A user would have to proactively
> >>>> trawls
> >>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an
> >>> interface?
> >>>>> As
> >>>>>>>>>>>>>>>> explained
> >>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that
> >>> the
> >>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the
> >> future.
> >>>>>>>>>>>>>> Defining a
> >>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may make
> >>> the
> >>>>>>>>>>>>>>>>> configuration a
> >>>>>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more
> >> flexibility.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is
> >>> weird
> >>>>>>>> and
> >>>>>>>>>>>>>>>> limiting.
> >>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation
> >>> section
> >>>>>>>>> would
> >>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond a)
> >>>>>>>> skipping
> >>>>>>>>>>>>>>>>>> “oversized”
> >>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Small change:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for
> >>>>> brevity
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> simplicity.
> >>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The name change sounds good to me.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks Alieh!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I thank you all again for your useful
> >>>>>>>> questions/suggestions.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as
> >>>> stated
> >>>>>>>> in
> >>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern
> >> here.
> >>>> It
> >>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a
> >> generic
> >>>>>>>>>>>>> handler.
> >>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific
> >>> errors
> >>>> we
> >>>>>>>>>>>>> want
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> handle,
> >>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic
> >>> thing
> >>>>>>>> for
> >>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>>> errors.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to
> >>>> solve
> >>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>> problems. I
> >>>>>>>>>>>>>>>>>>>>> agree though that we will need something more than
> >>> the
> >>>>>>>> error
> >>>>>>>>>>>>>>>> classes
> >>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling be
> >>>>>>>>>>>>>> configurable.
> >>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler
> >>>> means
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is
> >>> still
> >>>>>>>>>>>>>> unclear
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could
> >>>> include
> >>>>>>>> an
> >>>>>>>>>>>>>>>> example?
> >>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind and
> >>>> maybe
> >>>>>>>> we
> >>>>>>>>>>>>>> can
>
>
>

Reply via email to