Makes sense to me.

Thanks Matthias for summarizing the state.

Justine

On Mon, Jun 17, 2024 at 1:12 PM Matthias J. Sax <mj...@apache.org> wrote:

> Hey,
>
> seems this KIP is very difficult, and we actually had a lot of
> background discussion about it in the last weeks. I believe the problem
> with this KIP is, that we have 3 starting points to look at a problem,
> but these 3 starting points don't align:
>
>
>   1. Producer API: we want a clean API design or developers using the
> producer directly (clean semantics, not footguns...)
>
>   2. Kafka Streams: as a power user of the producer, we want to have
> advanced capabilities; given how KS works internally, we need a "power
> API" on he producer
>
>   3. Kafka Connect: also a power user of the producer. However, Connect
> is a framework not a programming API and thus prefers a config based
> approach
>
>
> I also think we got one idea wrong: let the user code / handler take
> care of retries. (I guess that's on me, I started with the idea to have
> a third return code RETRY...) -- the handler has not enough context
> information, and making this information available leads to a very
> clumsy interface. (Defeats (1) from above.)
>
> I believe, if we would move forward with the handler, we would need to
> let the producer do retries, and only call the handler after all
> retries/timeout are exhausted. However, for this to work, we need a
> producer config for Connect, what basically defeats the purpose for (2)
> to make it a programmatic solution (it seems somewhat redundant)
>
> Also, the idea to make the handler configurable, in hindsight, seems
> like a poor approach / bad compromise to address (3) w/o sacrificing
> (2), but is a problem for (1).
>
>
> We also discussed the "missing metadata" case, and actually believe we
> can address it w/o a public API change. Alieh put up a PR for this
> already: https://github.com/apache/kafka/pull/16344
>
>
> This leaves us with the "producer error state problem for EOS" but it
> might be better to solve this differently. Alieh started KIP-1059 for
> this case now.
>
>
> Thus, it seems we should DISCARD this KIP, and the Connect team can do a
> follow up KIP to add the producer configs they need for their own
> situation.
>
> Splitting the solutions tailored for the different situations seems to
> lead to an overall cleaner solution to the problem.
>
> Thoughts?
>
>
> -Matthias
>
>
>
> On 5/15/24 12:30 AM, Federico Valeri wrote:
> > Hello Alieh, thanks for this useful KIP.
> >
> > There is a typo in the motivation when you talk about the
> > UnknownTopicOrPartitionException. It's delivery.timeout.ms, not
> > deliver.timeout.ms.
> >
> > In the past, I did some work to improve and clean the official Kafka
> > examples, which I think are useful for new Kafka users. I was
> > wondering if it is worth to improve them in order to show the correct
> > usage of this new interface. If you agree, maybe we could mention this
> > in the proposed changes.
> >
> >> The accepted responses for RecordTooLargeException are FAIL and
> SWALLOW. Therefore, RETRY will be interpreted and executed as FAIL.
> >
> > Why do we need this javadoc note? I think it's not possible to return
> > RETRY in the current form.
> >
> > When we talk about swallowing in the default implementation, I think
> > we will log an error/warning and drop the record right? If yes, should
> > we clarify this and improve the DROP_INVALID_LARGE_RECORDS_DOC by
> > mentioning the logging part?
> >
> > Should we mention somewhere which logic takes precedence when both the
> > interface and configs are used?
> >
> > On Tue, May 14, 2024 at 4:45 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
> >>
> >> Hi Alieh,
> >>
> >> Thank you for all the updates! One final question--how will the retry
> >> timeout for unknown topic partition errors be implemented? I think it
> would
> >> be best if this could be done with an implementation of the error
> handler,
> >> but I don't see a way to track the necessary information with the
> >> current ProducerExceptionHandler interface.
> >>
> >> Cheers,
> >>
> >> Chris
> >>
> >> On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi
> <asae...@confluent.io.invalid>
> >> wrote:
> >>
> >>> Thanks Andrew. Done :)
> >>>
> >>> @Chris: I changed the config parameter type from boolean to integer,
> which
> >>> defines the timeout for retrying. I thought reusing `max.block.ms`
> was not
> >>> reasonable as you mentioned.
> >>>
> >>> So if the KIP looks good, let 's skip to the good part ;-) VOTING :)
> >>>
> >>> Bests,
> >>> Alieh
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <
> >>> andrew_schofi...@live.com>
> >>> wrote:
> >>>
> >>>> Hi Alieh,
> >>>> Just one final comment.
> >>>>
> >>>> [AJS5] Existing classes use Retriable, not Retryable. For example:
> >>>>
> >>>>
> >>>
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html
> >>>>
> >>>> I suggest RetriableResponse and NonRetriableResponse.
> >>>>
> >>>> Thanks,
> >>>> Andrew
> >>>>
> >>>>> On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID
> >
> >>>> wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>>
> >>>>> Thanks for all the valid points you listed.
> >>>>>
> >>>>>
> >>>>> KIP updates and addressing concerns:
> >>>>>
> >>>>>
> >>>>> 1) The KIP now suggests two Response types: `RetryableResponse` and
> >>>>> `NonRetryableResponse`
> >>>>>
> >>>>>
> >>>>> 2) `custom.exception.handler` is changed to
> >>>> `custom.exception.handler.class`
> >>>>>
> >>>>>
> >>>>> 3) The KIP clarifies that `In the case of an implemented handler for
> >>> the
> >>>>> specified exception, the handler takes precedence.`
> >>>>>
> >>>>>
> >>>>> 4)  There is now a `default` implementation for both handle()
> methods.
> >>>>>
> >>>>>
> >>>>> 5)  @Chris: for `UnknownTopicOrPartition`, the default is already
> >>>> retrying
> >>>>> for 60s. (In fact, the default value of `max.block.ms`). If the
> >>> handler
> >>>>> instructs to FAIL or SWALLOW, there will be no retry, and if the
> >>> handler
> >>>>> instructs to RETRY, that will be the default behavior, which follows
> >>> the
> >>>>> values in already existing config parameters such as `max.block.ms`.
> >>>> Does
> >>>>> that make sense?
> >>>>>
> >>>>>
> >>>>> Hope the changes and explanations are convincing :)
> >>>>>
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Alieh
> >>>>>
> >>>>> On Mon, May 13, 2024 at 6:40 PM Justine Olshan
> >>>> <jols...@confluent.io.invalid>
> >>>>> wrote:
> >>>>>
> >>>>>> Oh I see. The type isn't the error type but a newly defined type for
> >>> the
> >>>>>> response. Makes sense and works for me.
> >>>>>>
> >>>>>> Justine
> >>>>>>
> >>>>>> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <
> >>> fearthecel...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> If we have dedicated methods for each kind of exception
> >>>>>>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.),
> doesn't
> >>>> that
> >>>>>>> provide sufficient constraint? I'm not suggesting we eliminate
> these
> >>>>>>> methods, just that we change their return types to something more
> >>>>>> flexible.
> >>>>>>>
> >>>>>>> On Mon, May 13, 2024, 12:07 Justine Olshan
> >>>> <jols...@confluent.io.invalid
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I'm not sure I agree with the Retriable and NonRetriableResponse
> >>>>>> comment.
> >>>>>>>> This doesn't limit the blast radius or enforce certain errors are
> >>>> used.
> >>>>>>>> I think we might disagree on how controlled these interfaces can
> >>> be...
> >>>>>>>>
> >>>>>>>> Justine
> >>>>>>>>
> >>>>>>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton
> >>> <chr...@aiven.io.invalid
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Alieh,
> >>>>>>>>>
> >>>>>>>>> Thanks for the updates! I just have a few more thoughts:
> >>>>>>>>>
> >>>>>>>>> - I don't think a boolean property is sufficient to dictate
> retries
> >>>>>> for
> >>>>>>>>> unknown topic partitions, though. These errors can occur if a
> topic
> >>>>>> has
> >>>>>>>>> just been created, which can occur if, for example, automatic
> topic
> >>>>>>>>> creation is enabled for a multi-task connector. This is why I
> >>>>>> proposed
> >>>>>>> a
> >>>>>>>>> timeout instead of a boolean (and see my previous email for why
> >>>>>>> reducing
> >>>>>>>>> max.block.ms for a producer is not a viable alternative). If it
> >>>>>> helps,
> >>>>>>>> one
> >>>>>>>>> way to reproduce this yourself is to add the line
> >>>>>>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test
> >>> here:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> >>>>>>>>> and then check the logs afterward for messages like "Error while
> >>>>>>> fetching
> >>>>>>>>> metadata with correlation id <n> :
> >>>>>>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
> >>>>>>>>>
> >>>>>>>>> - I also don't think we need custom XxxResponse enums for every
> >>>>>>> possible
> >>>>>>>>> method; it seems like this will lead to a lot of duplication and
> >>>>>>>> cognitive
> >>>>>>>>> overhead if we want to expand the error handler in the future.
> >>>>>>> Something
> >>>>>>>>> more flexible like RetriableResponse and NonRetriableResponse
> could
> >>>>>>>>> suffice.
> >>>>>>>>>
> >>>>>>>>> - Finally, the KIP still doesn't state how the handler will or
> >>> won't
> >>>>>>> take
> >>>>>>>>> precedence over existing retry properties. If I set `retries` or
> `
> >>>>>>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that
> >>>>>> cause
> >>>>>>>>> retries to cease even if my custom handler would otherwise keep
> >>>>>>> returning
> >>>>>>>>> RETRY for an error?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Chris
> >>>>>>>>>
> >>>>>>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> >>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Alieh,
> >>>>>>>>>> Just a few more comments on the KIP. It is looking much less
> risky
> >>>>>>> now
> >>>>>>>>> the
> >>>>>>>>>> scope
> >>>>>>>>>> is tighter.
> >>>>>>>>>>
> >>>>>>>>>> [AJS1] It would be nice to have default implementations of the
> >>>>>> handle
> >>>>>>>>>> methods
> >>>>>>>>>> so an implementor would not need to implement both themselves.
> >>>>>>>>>>
> >>>>>>>>>> [AJS2] Producer configurations which are class names usually end
> >>> in
> >>>>>>>>>> “.class”.
> >>>>>>>>>> I suggest “custom.exception.handler.class”.
> >>>>>>>>>>
> >>>>>>>>>> [AJS3] If I implemented a handler, and I set a non-default value
> >>>>>> for
> >>>>>>>> one
> >>>>>>>>>> of the
> >>>>>>>>>> new configuations, what happens? I would expect that the handler
> >>>>>>> takes
> >>>>>>>>>> precedence. I wasn’t quite clear what “the control will follow
> the
> >>>>>>>>> handler
> >>>>>>>>>> instructions” meant.
> >>>>>>>>>>
> >>>>>>>>>> [AJS4] Because you now have an enum for the
> >>>>>>>>>> RecordTooLargeExceptionResponse,
> >>>>>>>>>> I don’t think you need to state in the comment for
> >>>>>>>>>> ProducerExceptionHandler that
> >>>>>>>>>> RETRY will be interpreted as FAIL.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Andrew
> >>>>>>>>>>
> >>>>>>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi
> >>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the very interesting discussion during my PTO.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> KIP updates and addressing concerns:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler
> >>>>>> for
> >>>>>>>> the
> >>>>>>>>>> two
> >>>>>>>>>>> exceptions with different input parameters so that we have
> >>>>>>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and
> >>>>>>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord
> record)
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 3) The KIP suggests having two more configuration parameters
> with
> >>>>>>>>> boolean
> >>>>>>>>>>> values:
> >>>>>>>>>>>
> >>>>>>>>>>> - `drop.invalid.large.records` with a default value of `false`
> >>>>>> for
> >>>>>>>>>>> swallowing too large records.
> >>>>>>>>>>>
> >>>>>>>>>>> - `retry.unknown.topic.partition` with a default value of
> `true`
> >>>>>>> that
> >>>>>>>>>>> performs RETRY for `max.block.ms` ms, encountering the
> >>>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Hope the main concerns are addressed so that we can go forward
> >>>>>> with
> >>>>>>>>>> voting.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Alieh
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> >>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Mathias,
> >>>>>>>>>>>>
> >>>>>>>>>>>>> [AL1] While I see the point, I would think having a different
> >>>>>>>>> callback
> >>>>>>>>>>>> for every exception might not really be elegant?
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm not sure how to assess the level of elegance of the
> >>>>>> proposal,
> >>>>>>>> but
> >>>>>>>>> I
> >>>>>>>>>> can
> >>>>>>>>>>>> comment on the technical characteristics:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Having specific interfaces that codify the logic that is
> >>>>>>>> currently
> >>>>>>>>>>>> prescribed in the comments reduce the chance of making a
> >>>>>> mistake.
> >>>>>>>>>>>> Commments may get ignored, misuderstood or etc. but if the
> >>>>>>> contract
> >>>>>>>> is
> >>>>>>>>>>>> codified, the compilier will help to enforce the contract.
> >>>>>>>>>>>> 2. Given that the logic is trickier than it seems (the
> >>>>>>>>> record-too-large
> >>>>>>>>>> is
> >>>>>>>>>>>> an example that can easily confuse someone who's not
> intimately
> >>>>>>>>> familiar
> >>>>>>>>>>>> with the nuances of the batching logic), having a little more
> >>>>>>> hoops
> >>>>>>>> to
> >>>>>>>>>> jump
> >>>>>>>>>>>> would give a greater chance that whoever tries to add a new
> >>>>>> cases
> >>>>>>>>> pauses
> >>>>>>>>>>>> and thinks a bit more.
> >>>>>>>>>>>> 3. As Justine pointed out, having different method will be a
> >>>>>>> forcing
> >>>>>>>>>>>> function to go through a KIP rather than smuggle new cases
> >>>>>> through
> >>>>>>>>>>>> implementation.
> >>>>>>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things
> >>>>>>>> reduce
> >>>>>>>>>> the
> >>>>>>>>>>>> chance of someone writing the code that works with 2 errors
> and
> >>>>>>> then
> >>>>>>>>>> when
> >>>>>>>>>>>> more errors are added in the future will suddenly incorrectly
> >>>>>>> ignore
> >>>>>>>>> new
> >>>>>>>>>>>> errors (the example I gave in the previous email).
> >>>>>>>>>>>>
> >>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> extend
> >>>>>>> as
> >>>>>>>>>>>> business logic. If a user puts a bad filter condition in their
> >>>>>> KS
> >>>>>>>> app,
> >>>>>>>>>> and
> >>>>>>>>>>>> drops messages
> >>>>>>>>>>>>
> >>>>>>>>>>>> I agree that there is always a chance to get a bug and lose
> >>>>>>>> messages,
> >>>>>>>>>> but
> >>>>>>>>>>>> there are generally separation of concerns that has different
> >>>>>> risk
> >>>>>>>>>> profile:
> >>>>>>>>>>>> the filtering logic may be more rigorously tested and rarely
> >>>>>>> changed
> >>>>>>>>>> (say
> >>>>>>>>>>>> an application developer does it), but setting the topics to
> >>>>>>> produce
> >>>>>>>>>> may be
> >>>>>>>>>>>> done via configuration (e.g. a user of the application does
> it)
> >>>>>>> and
> >>>>>>>>> it's
> >>>>>>>>>>>> generally an expectation that users would get an error when
> >>>>>>>>>> configuration
> >>>>>>>>>>>> is incorrect.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What could be worse is that UnknownTopicOrPartitionException
> can
> >>>>>>> be
> >>>>>>>> an
> >>>>>>>>>>>> intermittent error, i.e. with a generally correct
> configuration,
> >>>>>>>> there
> >>>>>>>>>>>> could be metadata propagation problem on the cluster and then
> a
> >>>>>>>> random
> >>>>>>>>>> set
> >>>>>>>>>>>> of records could get lost.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> >>>>>>>> checking
> >>>>>>>>>> the
> >>>>>>>>>>>> size of the record upfront is exactly what the KIP proposes?
> No?
> >>>>>>>>>>>>
> >>>>>>>>>>>> It achieves the same result but solves it differently, my
> >>>>>>> proposal:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Application checks the validity of a record (maybe via a
> new
> >>>>>>>>>>>> validateRecord method) before producing it, and can just
> exclude
> >>>>>>> it
> >>>>>>>> or
> >>>>>>>>>>>> return an error to the user.
> >>>>>>>>>>>> 2. Application produces the record -- at this point there are
> no
> >>>>>>>>> records
> >>>>>>>>>>>> that could return record too large, they were either skipped
> at
> >>>>>>>> step 1
> >>>>>>>>>> or
> >>>>>>>>>>>> we didn't get here because step 1 failed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Vs. KIP's proposal
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Application produces the record.
> >>>>>>>>>>>> 2. Application gets a callback.
> >>>>>>>>>>>> 3. Application returns the action on how to proceed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The advantage of the former is the clarity of semantics -- the
> >>>>>>>> record
> >>>>>>>>> is
> >>>>>>>>>>>> invalid (property of the record, not a function of server
> state
> >>>>>> or
> >>>>>>>>>> server
> >>>>>>>>>>>> configuration) and we can clearly know that it is the record
> >>>>>> that
> >>>>>>> is
> >>>>>>>>> bad
> >>>>>>>>>>>> and can never succeed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The KIP-proposed way actually has a very tricky point: it
> >>>>>> actually
> >>>>>>>>>> handles
> >>>>>>>>>>>> a subset of record-too-large exceptions.  The broker can
> return
> >>>>>>>>>>>> record-too-large and reject the whole batch (but we don't want
> >>>>>> to
> >>>>>>>>> ignore
> >>>>>>>>>>>> those because then we can skip random records that just
> happened
> >>>>>>> to
> >>>>>>>> be
> >>>>>>>>>> in
> >>>>>>>>>>>> the same batch), in some sense we use the same error for 2
> >>>>>>> different
> >>>>>>>>>>>> conditions and understanding that requires pretty deep
> >>>>>>> understanding
> >>>>>>>>> of
> >>>>>>>>>>>> Kafka internals.
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Artem
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> >>>>>>>>>> <jols...@confluent.io.invalid
> >>>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> My concern with respect to it being fragile: the code that
> >>>>>>> ensures
> >>>>>>>>> the
> >>>>>>>>>>>>> error type is internal to the producer. Someone may see it
> and
> >>>>>>>> say, I
> >>>>>>>>>>>> want
> >>>>>>>>>>>>> to add such and such error. This looks like internal code,
> so I
> >>>>>>>> don't
> >>>>>>>>>>>> need
> >>>>>>>>>>>>> a KIP, and then they can change it to whatever they want
> >>>>>> thinking
> >>>>>>>> it
> >>>>>>>>> is
> >>>>>>>>>>>>> within the typical kafka improvement protocol.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Relying on an internal change to enforce an external API is
> >>>>>>> fragile
> >>>>>>>>> in
> >>>>>>>>>> my
> >>>>>>>>>>>>> opinion. That's why I sort of agreed with Artem with
> enforcing
> >>>>>>> the
> >>>>>>>>>> error
> >>>>>>>>>>>> in
> >>>>>>>>>>>>> the method signature -- part of the public API.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Chris's comments on requiring more information to handler
> again
> >>>>>>>> makes
> >>>>>>>>>> me
> >>>>>>>>>>>>> wonder if we are solving a problem of lack of information at
> >>>>>> the
> >>>>>>>>>>>>> application level with a more powerful solution than we need.
> >>>>>>> (Ie,
> >>>>>>>> if
> >>>>>>>>>> we
> >>>>>>>>>>>>> had more information, could the application close and restart
> >>>>>> the
> >>>>>>>>>>>>> transaction rather than having to drop records) But I am
> happy
> >>>>>> to
> >>>>>>>>>>>>> compromise with a handler that we can agree is sufficiently
> >>>>>>>>> controlled
> >>>>>>>>>>>> and
> >>>>>>>>>>>>> documented.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
> >>>>>>>> <chr...@aiven.io.invalid
> >>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Continuing prior discussions:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching
> >>>>>> point
> >>>>>>> is
> >>>>>>>>>>>> that I
> >>>>>>>>>>>>>> don't see the point in allowing for this kind of pluggable
> >>>>>> logic
> >>>>>>>>>>>> without
> >>>>>>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if
> >>>>>>> we're
> >>>>>>>>>> going
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> implement retries only on "important" topics when a topic
> >>>>>>>> partition
> >>>>>>>>>>>> isn't
> >>>>>>>>>>>>>> found, why wouldn't we also want to be able to do this for
> >>>>>> other
> >>>>>>>>>>>> errors?
> >>>>>>>>>>>>>> Again, taking authorization errors as an example, why
> wouldn't
> >>>>>>> we
> >>>>>>>>> want
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> be able to fail when we can't write to "important" topics
> >>>>>>> because
> >>>>>>>>> the
> >>>>>>>>>>>>>> producer principal lacks sufficient ACLs, and drop the
> record
> >>>>>> if
> >>>>>>>> the
> >>>>>>>>>>>>> topic
> >>>>>>>>>>>>>> isn't "important"? In a security-conscious environment with
> >>>>>>>>>>>>>> runtime-dependent topic routing (which is a common feature
> of
> >>>>>>> many
> >>>>>>>>>>>> source
> >>>>>>>>>>>>>> connectors, such as the Debezium connectors), this seems
> >>>>>> fairly
> >>>>>>>>>> likely.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2) As far as changing the shape of the API goes, I like
> >>>>>> Artem's
> >>>>>>>> idea
> >>>>>>>>>> of
> >>>>>>>>>>>>>> splitting out the interface based on specific exceptions.
> This
> >>>>>>> may
> >>>>>>>>> be
> >>>>>>>>>> a
> >>>>>>>>>>>>>> little laborious to expand in the future, but if we really
> >>>>>> want
> >>>>>>> to
> >>>>>>>>>>>>>> limit the exceptions that we cover with the handler and move
> >>>>>>>> slowly
> >>>>>>>>>> and
> >>>>>>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in
> the
> >>>>>>>>>>>>> interface. I
> >>>>>>>>>>>>>> also acknowledge that there's no way to completely prevent
> >>>>>>> people
> >>>>>>>>> from
> >>>>>>>>>>>>>> shooting themselves in the foot by implementing the API
> >>>>>>>> incorrectly,
> >>>>>>>>>>>> but
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>> think it's worth it to do what we can--including leveraging
> >>>>>> the
> >>>>>>>> Java
> >>>>>>>>>>>>>> language's type system--to help them, so IMO there's value
> to
> >>>>>>>>>>>> eliminating
> >>>>>>>>>>>>>> the implicit behavior of failing when a policy returns RETRY
> >>>>>>> for a
> >>>>>>>>>>>>>> non-retriable error. This can take a variety of shapes and
> I'm
> >>>>>>> not
> >>>>>>>>>>>> going
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> insist on anything specific, but I do want to again raise my
> >>>>>>>>> concerns
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>> the current proposal and request that we find something a
> >>>>>> little
> >>>>>>>>>>>> better.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3) Concerning the default implementation--actually, I meant
> >>>>>>> what I
> >>>>>>>>>>>> wrote
> >>>>>>>>>>>>> :)
> >>>>>>>>>>>>>> I don't want a "second" default, I want an implementation of
> >>>>>>> this
> >>>>>>>>>>>>> interface
> >>>>>>>>>>>>>> to be used as the default if no others are specified. The
> >>>>>>> behavior
> >>>>>>>>> of
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>> default implementation would be identical to existing
> behavior
> >>>>>>> (so
> >>>>>>>>>>>> there
> >>>>>>>>>>>>>> would be no backwards compatibility concerns like the ones
> >>>>>>> raised
> >>>>>>>> by
> >>>>>>>>>>>>>> Matthias), but it would be possible to configure this
> default
> >>>>>>>>> handler
> >>>>>>>>>>>>> class
> >>>>>>>>>>>>>> to behave differently for a basic set of scenarios. This
> would
> >>>>>>>>> mirror
> >>>>>>>>>>>>> (pun
> >>>>>>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and
> its
> >>>>>>>>>>>>>> ReplicationPolicy interface [1]. There is a default
> >>>>>>> implementation
> >>>>>>>>>>>>>> available [2] that recognizes a handful of basic
> configuration
> >>>>>>>>>>>> properties
> >>>>>>>>>>>>>> [3] for simple tweaks, but if users want, they can also
> >>>>>>> implement
> >>>>>>>>>> their
> >>>>>>>>>>>>> own
> >>>>>>>>>>>>>> replication policy for more fine-grained logic if those
> >>>>>>> properties
> >>>>>>>>>>>> aren't
> >>>>>>>>>>>>>> flexible enough.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> More concretely, I'm imagining something like this for the
> >>>>>>>> producer
> >>>>>>>>>>>>>> exception handler:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Default implementation class
> >>>>>>>>>>>>>> of
> >>>>>>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> >>>>>>>>>>>>>> - This class would recognize two properties:
> >>>>>>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to
> >>>>>>>> false.
> >>>>>>>>> If
> >>>>>>>>>>>>>> "false", then causes the handler to return FAIL whenever
> >>>>>>>>>>>>>> a RecordTooLargeException is encountered; if "true", then
> >>>>>> causes
> >>>>>>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead
> >>>>>>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer
> >>>>>> property,
> >>>>>>>>>>>> defaults
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> >>>>>>>>> encountered,
> >>>>>>>>>>>>>> causes the handler to return FAIL if that record has been
> >>>>>>> pending
> >>>>>>>>> for
> >>>>>>>>>>>>> more
> >>>>>>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be
> returned
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think this is worth addressing now instead of later
> because
> >>>>>> it
> >>>>>>>>>> forces
> >>>>>>>>>>>>> us
> >>>>>>>>>>>>>> to evaluate the usefulness of this interface and it
> addresses
> >>>>>> a
> >>>>>>>>>>>>>> long-standing issue not just with Kafka Connect, but with
> the
> >>>>>>> Java
> >>>>>>>>>>>>> producer
> >>>>>>>>>>>>>> in general. For reference, here are a few tickets I
> collected
> >>>>>>>> after
> >>>>>>>>>>>>> briefly
> >>>>>>>>>>>>>> skimming our Jira showing that this is a real pain point for
> >>>>>>>> users:
> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although
> >>>>>>> this
> >>>>>>>> is
> >>>>>>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone
> >>>>>> who
> >>>>>>>>>>>>> configures
> >>>>>>>>>>>>>> a producer to use a high retry timeout. I am aware of the
> >>>>>>>>>> max.block.ms
> >>>>>>>>>>>>>> property, but it's painful and IMO poor behavior to require
> >>>>>>> users
> >>>>>>>> to
> >>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>> the value of this property just to handle the single
> scenario
> >>>>>>> when
> >>>>>>>>>>>> trying
> >>>>>>>>>>>>>> to write to topics that don't exist, since it would also
> limit
> >>>>>>> the
> >>>>>>>>>>>> retry
> >>>>>>>>>>>>>> timeout for other operations that are legitimately
> retriable.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Raising new points:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 5) I don't see the interplay between this handler and
> existing
> >>>>>>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm
> >>>>>>>> assuming
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>> properties like "retries", "max.block.ms", and "
> >>>>>>>> delivery.timeout.ms
> >>>>>>>>> "
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>> take precedence over the handler and once they are
> exhausted,
> >>>>>>> the
> >>>>>>>>>>>>>> record/batch will fail no matter what? If so, it's probably
> >>>>>>> worth
> >>>>>>>>>>>> briefly
> >>>>>>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP,
> >>>>>> and
> >>>>>>>> if
> >>>>>>>>>>>> not,
> >>>>>>>>>>>>>> I'm curious what you have in mind.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 6) I also wonder if the API provides enough information in
> its
> >>>>>>>>> current
> >>>>>>>>>>>>>> form. Would it be possible to provide handlers with some way
> >>>>>> of
> >>>>>>>>>>>> tracking
> >>>>>>>>>>>>>> how long a record has been pending for (i.e., how long it's
> >>>>>> been
> >>>>>>>>> since
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> record was provided as an argument to Producer::send)? One
> way
> >>>>>>> to
> >>>>>>>> do
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)`
> >>>>>> and
> >>>>>>>>>>>>>> allow/require the handler to do its own bookkeeping,
> probably
> >>>>>>>> with a
> >>>>>>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that
> the
> >>>>>>>>> handler
> >>>>>>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to
> >>>>>>> track
> >>>>>>>>>>>>> records.
> >>>>>>>>>>>>>> An alternative could be to include information about the
> >>>>>> initial
> >>>>>>>>> time
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> record was received by the producer and the number of
> retries
> >>>>>>> that
> >>>>>>>>>> have
> >>>>>>>>>>>>>> been performed for it as parameters in the handle method(s),
> >>>>>> but
> >>>>>>>> I'm
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>> sure how easy this would be to implement and it might
> clutter
> >>>>>>>> things
> >>>>>>>>>>>> up a
> >>>>>>>>>>>>>> bit too much.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer,
> >>>>>>>>>>>>> AutoCloseable)
> >>>>>>>>>>>>>> as a superinterface for the handler interface?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1] -
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> >>>>>>>>>>>>>> [2] -
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> >>>>>>>>>>>>>> [3] -
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> >>>>>>>>>>>>>> ,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
> >>>>>>> mj...@apache.org
> >>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Very interesting discussion.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Seems a central point is the question about "how generic"
> we
> >>>>>>>>> approach
> >>>>>>>>>>>>>>> this, and some people think we need to be conservative and
> >>>>>>> others
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>>> we should try to be as generic as possible.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Personally, I think following a limited scope for this KIP
> >>>>>> (by
> >>>>>>>>>>>>>>> explicitly saying we only cover RecordTooLarge and
> >>>>>>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete
> >>>>>>>> tickets
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> we address, while for other exception (like authorization)
> we
> >>>>>>>> don't
> >>>>>>>>>>>>> know
> >>>>>>>>>>>>>>> if people want to handle it to begin with. Boiling the
> ocean
> >>>>>>>> might
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>> get us too far, and being somewhat pragmatic might help to
> >>>>>> move
> >>>>>>>>> this
> >>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we
> want
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>>>>>>> careful anyway to not allow users to break stuff too
> easily.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As the same time, I agree that we should setup this change
> >>>>>> in a
> >>>>>>>>>>>> forward
> >>>>>>>>>>>>>>> looking way, and thus having a single generic handler
> allows
> >>>>>> us
> >>>>>>>> to
> >>>>>>>>>>>>> later
> >>>>>>>>>>>>>>> extend the handler more easily. This should also simplify
> >>>>>>> follow
> >>>>>>>> up
> >>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>> that might add new error cases (I actually mentioned one
> more
> >>>>>>> to
> >>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>> already, but we both agreed that it might be best to
> exclude
> >>>>>> it
> >>>>>>>>> from
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up
> >>>>>> KIP
> >>>>>>> is
> >>>>>>>>> not
> >>>>>>>>>>>>>>> the end of the world.)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> @Chris:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it
> >>>>>> actually
> >>>>>>>>> would
> >>>>>>>>>>>>> be?
> >>>>>>>>>>>>>>> If the contract is clearly defined, it seems to be fine
> what
> >>>>>>> the
> >>>>>>>>> KIP
> >>>>>>>>>>>>>>> proposes, and given that such a handler is an expert API,
> and
> >>>>>>> we
> >>>>>>>>> can
> >>>>>>>>>>>>>>> provide "best practices" (cf my other comment below in
> >>>>>> [AL1]),
> >>>>>>>>> being
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>> little bit pragmatic sound fine to me.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Not sure if I understand Justin's argument on this
> question?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (3) About having a default impl or not. I am fine with
> adding
> >>>>>>>> one,
> >>>>>>>>>>>> even
> >>>>>>>>>>>>>>> if I am not sure why Connect could not just add its own one
> >>>>>> and
> >>>>>>>>> plug
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>> in (and we would add corresponding configs for Connect, but
> >>>>>> not
> >>>>>>>> for
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> producer itself)? For this case, we could also do this as a
> >>>>>>>> follow
> >>>>>>>>> up
> >>>>>>>>>>>>>>> KIP, but happy to include it in this KIP to provide value
> to
> >>>>>>>>> Connect
> >>>>>>>>>>>>>>> right away (even if the value might not come right away if
> we
> >>>>>>>> miss
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we
> >>>>>> would
> >>>>>>>> for
> >>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>> plugin our own impl, and lock down the config such that
> users
> >>>>>>>>> cannot
> >>>>>>>>>>>>> set
> >>>>>>>>>>>>>>> their own handler on the internal producer to begin with.
> >>>>>> Might
> >>>>>>>> be
> >>>>>>>>>>>> good
> >>>>>>>>>>>>>>> to elaborate why the producer should have a default? We
> might
> >>>>>>>>>>>> actually
> >>>>>>>>>>>>>>> want to add this to the KIP right away?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The key for a default impl would be, to not change the
> >>>>>> current
> >>>>>>>>>>>>> behavior,
> >>>>>>>>>>>>>>> and having no default seems to achieve this. For the two
> >>>>>> cases
> >>>>>>>> you
> >>>>>>>>>>>>>>> mentioned, it's unclear to me what default value on "upper
> >>>>>>> bound
> >>>>>>>> on
> >>>>>>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set?
> >>>>>>> Seems
> >>>>>>>>> it
> >>>>>>>>>>>>>>> would need to be the same as `delivery.timeout.ms`?
> However,
> >>>>>>> if
> >>>>>>>>>>>> users
> >>>>>>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would
> >>>>>> need
> >>>>>>> to
> >>>>>>>>> set
> >>>>>>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we
> >>>>>>>> hard-code 2
> >>>>>>>>>>>>>>> minutes, it might not be backward compatible. I have the
> >>>>>>>> impression
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>> might introduce some undesired coupling? -- For the "record
> >>>>>> too
> >>>>>>>>>>>> large"
> >>>>>>>>>>>>>>> case, the config seems to be boolean and setting it to
> >>>>>> `false`
> >>>>>>> by
> >>>>>>>>>>>>>>> default seems to provide backward compatibility.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> @Artem:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [AL1] While I see the point, I would think having a
> different
> >>>>>>>>>>>> callback
> >>>>>>>>>>>>>>> for every exception might not really be elegant? In the
> end,
> >>>>>>> the
> >>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>> is an very advanced feature anyway, and if it's implemented
> >>>>>> in
> >>>>>>> a
> >>>>>>>>> bad
> >>>>>>>>>>>>>>> way, well, it's a user error -- we cannot protect users
> from
> >>>>>>>>>>>>> everything.
> >>>>>>>>>>>>>>> To me, a handler like this, is to some extend "business
> >>>>>> logic"
> >>>>>>>> and
> >>>>>>>>>>>> if a
> >>>>>>>>>>>>>>> user gets business logic wrong, it's hard to protect them.
> --
> >>>>>>> We
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>> of course provide best practice guidance in the JaveDocs,
> and
> >>>>>>>>> explain
> >>>>>>>>>>>>>>> that a handler should have explicit `if` statements for
> stuff
> >>>>>>> it
> >>>>>>>>> want
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> handle, and only a single default which return FAIL.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application
> >>>>>> layer.
> >>>>>>>> Ie,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> TX is not completed, we clean up and setup out task from
> >>>>>>> scratch,
> >>>>>>>>> to
> >>>>>>>>>>>>>>> ensure the pending transaction is completed before we
> resume.
> >>>>>>> If
> >>>>>>>>> the
> >>>>>>>>>>>> TX
> >>>>>>>>>>>>>>> was indeed aborted, we would retry from older offset and
> thus
> >>>>>>>> just
> >>>>>>>>>>>> hit
> >>>>>>>>>>>>>>> the same error again and the loop begins again.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> >>>>>> extend
> >>>>>>>> as
> >>>>>>>>>>>>>>> business logic. If a user puts a bad filter condition in
> >>>>>> their
> >>>>>>> KS
> >>>>>>>>>>>> app,
> >>>>>>>>>>>>>>> and drops messages, it nothing we can do about it, and this
> >>>>>>>> handler
> >>>>>>>>>>>>>>> IMHO, has a similar purpose. This is also the line of
> >>>>>> thinking
> >>>>>>> I
> >>>>>>>>>>>> apply
> >>>>>>>>>>>>>>> to EOS, to address Justin's concern about "should we allow
> to
> >>>>>>>> drop
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> EOS", and my answer is "yes", because it's more business
> >>>>>> logic
> >>>>>>>> than
> >>>>>>>>>>>>>>> actual error handling IMHO. And by default, we fail... So
> >>>>>> users
> >>>>>>>>>>>> opt-in
> >>>>>>>>>>>>>>> to add business logic to drop records. It's an application
> >>>>>>> level
> >>>>>>>>>>>>>>> decision how to write the code.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> >>>>>>>>> checking
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> size of the record upfront is exactly what the KIP
> proposes?
> >>>>>>> No?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> @Justin:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I saw the sample
> >>>>>>>>>>>>>>>> code -- is it just an if statement checking for the error
> >>>>>>> before
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> What do you mean by fragile? Not sure if I see your point.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> >>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP.  The motivation talks about very
> >>>>>> specific
> >>>>>>>>>>>> cases,
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>> the interface is generic.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [AL1]
> >>>>>>>>>>>>>>>> If the interface evolves in the future I think we could
> have
> >>>>>>> the
> >>>>>>>>>>>>>>> following
> >>>>>>>>>>>>>>>> confusion:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1. A user implemented SWALLOW action for both
> >>>>>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.  For simpicity they just
> >>>>>>>> return
> >>>>>>>>>>>>>> SWALLOW
> >>>>>>>>>>>>>>>> from the function, because it elegantly handles all known
> >>>>>>> cases.
> >>>>>>>>>>>>>>>> 2. The interface has evolved to support a new exception.
> >>>>>>>>>>>>>>>> 3. The user has upgraded their Kafka client.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Now a new kind of error gets dropped on the floor without
> >>>>>>> user's
> >>>>>>>>>>>>>>> intention
> >>>>>>>>>>>>>>>> and it would be super hard to detect and debug.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> To avoid the confusion, I think we should use handlers for
> >>>>>>>>> specific
> >>>>>>>>>>>>>>>> exceptions.  Then if a new exception is added it won't get
> >>>>>>>>> silently
> >>>>>>>>>>>>>>> swalled
> >>>>>>>>>>>>>>>> because the user would need to add new functionality to
> >>>>>> handle
> >>>>>>>> it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also have some higher level comments:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [AL2]
> >>>>>>>>>>>>>>>>> it throws a TimeoutException, and the user can only
> blindly
> >>>>>>>>> retry,
> >>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>> may result in an infinite retry loop
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> If the TimeoutException happens during transactional
> >>>>>>> processing
> >>>>>>>>>>>>>> (exactly
> >>>>>>>>>>>>>>>> once is the desired sematnics), then the client should not
> >>>>>>> retry
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> gets TimeoutException because without knowing the reason
> for
> >>>>>>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the
> >>>>>> message
> >>>>>>>> got
> >>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>> produced or not and retrying the message may result in
> >>>>>>>>> duplicatees.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the
> >>>>>>>>>>>> underlying
> >>>>>>>>>>>>>> root
> >>>>>>>>>>>>>>>> cause of missing metadata
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Maybe we should fix the error handling and return the
> proper
> >>>>>>>>>>>>> underlying
> >>>>>>>>>>>>>>>> message?  Then the application can properly handle the
> >>>>>> message
> >>>>>>>>>>>> based
> >>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>> preferences.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>  From the product perspective, it's not clear how safe it
> is
> >>>>>> to
> >>>>>>>>>>>>> blindly
> >>>>>>>>>>>>>>>> ignore UnknownTopicOrPartitionException.  This could lead
> to
> >>>>>>>>>>>>> situations
> >>>>>>>>>>>>>>>> when a simple typo could lead to massive data loss (part
> of
> >>>>>>> the
> >>>>>>>>>>>> data
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> effectively be produced to a "black hole" and the user may
> >>>>>> not
> >>>>>>>>>>>> notice
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> for a while).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In which situations would you recommend the user to "black
> >>>>>>> hole"
> >>>>>>>>>>>>>> messages
> >>>>>>>>>>>>>>>> in case of misconfiguration?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> [AL3]
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> >>>>>>>>>>>>> RecordTooLargeException,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Is it my understanding that this KIP proposes that
> >>>>>>> functionality
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that
> happen
> >>>>>>>>> because
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> producer cannot produce the record (if the broker rejects
> >>>>>> the
> >>>>>>>>>>>> batch,
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> error won't get to the handler, because we cannot know
> which
> >>>>>>>> other
> >>>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>> get ignored).  In this case, why not just check the
> locally
> >>>>>>>>>>>>> configured
> >>>>>>>>>>>>>>> max
> >>>>>>>>>>>>>>>> record size upfront and not produce the recrord in the
> first
> >>>>>>>>> place?
> >>>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>> we can expose a validation function from the producer that
> >>>>>>> could
> >>>>>>>>>>>>>> validate
> >>>>>>>>>>>>>>>> the records locally, so we don't need to produce the
> record
> >>>>>> in
> >>>>>>>>>>>> order
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> know that it's invalid.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> >>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Alieh and Chris,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I
> guess
> >>>>>> I
> >>>>>>>> just
> >>>>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>> understand how that would be ensured on the producer
> side.
> >>>>>> I
> >>>>>>>> saw
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> sample
> >>>>>>>>>>>>>>>>> code -- is it just an if statement checking for the error
> >>>>>>>> before
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Can you clarify what you mean by `since the code does not
> >>>>>>> reach
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> KS
> >>>>>>>>>>>>>>>>> interface and breaks somewhere in producer.` If we
> surfaced
> >>>>>>>> this
> >>>>>>>>>>>>> error
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> the application in a better way would that also be a
> >>>>>> solution
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> issue?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> >>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> @Chris
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning
> is
> >>>>>>> the
> >>>>>>>>> one
> >>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the
> >>>>>>>> future,
> >>>>>>>>>>>> but
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> Justine mentioned, we must be very conservative about
> >>>>>> adding
> >>>>>>>>> more
> >>>>>>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that
> >>>>>> the
> >>>>>>>> user
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> able
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> develop their own code. As mentioned in the motivation
> >>>>>>> section
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> examples, sometimes the user decides on dropping a
> record
> >>>>>>>> based
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> topic, for example.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2) Defining two separate methods for retriable and
> >>>>>>>> non-retriable
> >>>>>>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may
> >>>>>>> still
> >>>>>>>>>>>>> make a
> >>>>>>>>>>>>>>>>>> mistake by implementing the wrong method and see a
> >>>>>>>> non-expecting
> >>>>>>>>>>>>>>>>> behaviour.
> >>>>>>>>>>>>>>>>>> For example, he may implement handleRetriable() for
> >>>>>>>>>>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice,
> he
> >>>>>>> sees
> >>>>>>>>> no
> >>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> default behaviour since he implemented the wrong
> method. I
> >>>>>>>> think
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> never reduce the user’s mistakes to 0.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3) Default implementation for Handler: the default
> >>>>>> behaviour
> >>>>>>>> is
> >>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>>>> preserved with NO need of implementing any handler or
> >>>>>>> setting
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> corresponding config parameter
> `custom.exception.handler`.
> >>>>>>>> What
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> actually having a second default, which requires having
> >>>>>> both
> >>>>>>>>>>>>>> interface
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> config parameters. About
> UnknownTopicOrPartitionException:
> >>>>>>> the
> >>>>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>>>> already offers the config parameter `max.block.ms`
> which
> >>>>>>>>>>>>> determines
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> duration of retrying. The main purpose of the user who
> >>>>>> needs
> >>>>>>>> the
> >>>>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it
> in
> >>>>>>> the
> >>>>>>>>>>>> way
> >>>>>>>>>>>>> he
> >>>>>>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS
> >>>>>> users.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow
> the
> >>>>>>>>> error,
> >>>>>>>>>>>>>> while
> >>>>>>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense
> for
> >>>>>>>> more
> >>>>>>>>>>>>> ppl,
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> change it to SKIP
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> @Justine
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1) was addressed by Chris.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned.
> >>>>>>> Currently,
> >>>>>>>>>>>>> there
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>> way to handle these issues application-side. Even KS
> users
> >>>>>>> who
> >>>>>>>>>>>>>>> implement
> >>>>>>>>>>>>>>>>> KS
> >>>>>>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the
> >>>>>>>> exceptions
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>>>> intend since the code does not reach the KS interface
> and
> >>>>>>>> breaks
> >>>>>>>>>>>>>>>>> somewhere
> >>>>>>>>>>>>>>>>>> in producer.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> >>>>>>>>>>>>>> fearthecel...@gmail.com>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Justine,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The method signatures for the interface are indeed
> >>>>>>>> open-ended,
> >>>>>>>>>>>> but
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>> states that its uses will be limited. See the
> motivation
> >>>>>>>>>>>> section:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We believe that the user should be able to develop
> >>>>>> custom
> >>>>>>>>>>>>> exception
> >>>>>>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other
> >>>>>>> hand,
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>> an expert-level API, and using that may result in
> strange
> >>>>>>>>>>>>> behaviour
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> system, making it hard to find the root cause.
> Therefore,
> >>>>>>> the
> >>>>>>>>>>>>> custom
> >>>>>>>>>>>>>>>>>>> handler is currently limited to handling
> >>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> >>>>>>>>>>>>>>> <jols...@confluent.io.invalid
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :(
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to
> >>>>>> two
> >>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic
> >>>>>>> Exception e
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any
> >>>>>>>> exception.
> >>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>> didn't
> >>>>>>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit
> >>>>>>>>> concerned
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and
> >>>>>> the
> >>>>>>>>>>>>> contract
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>> with transactions. We are allowing the client to make
> >>>>>>>>> decisions
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we
> >>>>>>> build
> >>>>>>>> in
> >>>>>>>>>>>>> log
> >>>>>>>>>>>>>>>>>>> messages
> >>>>>>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm
> >>>>>> really
> >>>>>>>>>>>>>> concerned
> >>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>> messages being silently dropped.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non
> >>>>>>> retriable
> >>>>>>>>>>>>> errors
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown
> topic
> >>>>>>> or
> >>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>> exception too early, as there are cases where it can
> be
> >>>>>>>>>>>>> transient.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping
> records
> >>>>>>> as
> >>>>>>>>>>>> part
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> EOS
> >>>>>>>>>>>>>>>>>>>> generally as in many cases, these errors signify an
> >>>>>> issue
> >>>>>>>> with
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> original
> >>>>>>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror
> maker
> >>>>>>> may
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> reasons
> >>>>>>>>>>>>>>>>>>>> they want to progress past these messages, but
> wondering
> >>>>>>> if
> >>>>>>>>>>>> there
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>> that can be done application-side. I'm willing to
> accept
> >>>>>>>> this
> >>>>>>>>>>>>> sort
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of
> thing
> >>>>>>> is
> >>>>>>>>>>>>>> happening
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> we limit the blast radius for what we can do.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> >>>>>>>>>>>>>> <chr...@aiven.io.invalid
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have
> >>>>>>> some
> >>>>>>>>>>>>>> thoughts
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> I'd like to see addressed before voting.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable
> >>>>>>>>> interface,
> >>>>>>>>>>>>> why
> >>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very
> >>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>> scenarios?
> >>>>>>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be
> >>>>>>>> handled
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>>>>>> (allowing users to drop records destined for some
> >>>>>>>> off-limits
> >>>>>>>>>>>>>>>>> topics,
> >>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay
> in
> >>>>>>> the
> >>>>>>>>>>>>>>>>>> propagation
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of
> >>>>>> other
> >>>>>>>>>>>> errors
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the
> >>>>>> follow-up
> >>>>>>>>> work
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure
> >>>>>> that
> >>>>>>>>> we're
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> painting
> >>>>>>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way
> >>>>>>> that
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>> future modifications difficult.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> >>>>>>>>>>>>> non-retriable
> >>>>>>>>>>>>>>>>>>> errors
> >>>>>>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two
> >>>>>>>>> separate
> >>>>>>>>>>>>>>>>>> methods
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> handle each case separately? That way there's no
> >>>>>>> ambiguity
> >>>>>>>> or
> >>>>>>>>>>>>>>>>>> implicit
> >>>>>>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a
> >>>>>>>>>>>>>>>>>> RecordTooLargeException.
> >>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse
> >>>>>>>>>>>>>>>>> handle(ProducerRecord,
> >>>>>>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse
> >>>>>>>>>>>>> handleRetriable(ProducerRecord,
> >>>>>>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can
> >>>>>>> obviously
> >>>>>>>>> be
> >>>>>>>>>>>>>>>>> toyed
> >>>>>>>>>>>>>>>>>>>> with a
> >>>>>>>>>>>>>>>>>>>>> bit.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface
> >>>>>> may
> >>>>>>>>>>>> benefit
> >>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams
> >>>>>>>>>>>>>> applications,
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>> comes at significant deployment cost for other
> >>>>>>> low-/no-code
> >>>>>>>>>>>>>>>>>>> environments,
> >>>>>>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and
> >>>>>>> MirrorMaker
> >>>>>>>> 2.
> >>>>>>>>>>>>> Can
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> default implementation of the exception handler that
> >>>>>>> allows
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>> simple
> >>>>>>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property?
> Two
> >>>>>>>> things
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry
> >>>>>> time
> >>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option
> to
> >>>>>>> drop
> >>>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large
> >>>>>> exception.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead
> of
> >>>>>>> the
> >>>>>>>>>>>>>> proposed
> >>>>>>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and
> non-obvious,
> >>>>>>>>>>>>> especially
> >>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> >>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer
> in
> >>>>>>>> order
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and
> >>>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> >>>>>>>>>>>> sections]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the
> >>>>>>> producer
> >>>>>>>>>>>>>>>>> collects
> >>>>>>>>>>>>>>>>>>>>> multiple
> >>>>>>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException
> >>>>>>> related
> >>>>>>>>>>>> to a
> >>>>>>>>>>>>>>>>>>> single
> >>>>>>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom
> >>>>>>>> exception
> >>>>>>>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>> this case may decide on dropping the record and
> >>>>>>> continuing
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> processing.
> >>>>>>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka
> >>>>>> Streams, a
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way
> to
> >>>>>>> skip
> >>>>>>>>>>>> over
> >>>>>>>>>>>>>>>>>> it. A
> >>>>>>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside
> >>>>>> the
> >>>>>>>>>>>>>>>>> producer,
> >>>>>>>>>>>>>>>>>>>> i.e.,
> >>>>>>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify
> >>>>>> the
> >>>>>>>>>>>> overall
> >>>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section
> for
> >>>>>>> more
> >>>>>>>>>>>>>>>>>>> explanation.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the
> >>>>>>>>> producer
> >>>>>>>>>>>>>>>>>> handles
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log
> about
> >>>>>>>>> missing
> >>>>>>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits "
> >>>>>>>>>>>>>>>>>>> deliver.timeout.ms"
> >>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only
> >>>>>> blindly
> >>>>>>>>>>>> retry,
> >>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> may
> >>>>>>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown
> >>>>>>>>> TimeoutException
> >>>>>>>>>>>>>>>>>> "cuts"
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing
> >>>>>>>> metadata
> >>>>>>>>>>>>>>>>> (which
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a
> >>>>>>>>>>>>> non-existing
> >>>>>>>>>>>>>>>>>>>> topic).
> >>>>>>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the
> >>>>>> infinite
> >>>>>>>>>>>> retry
> >>>>>>>>>>>>>>>>>> loop.
> >>>>>>>>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the
> >>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>>> gets
> >>>>>>>>>>>>>>>>>>>>> stuck.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - Having interface vs configuration option:
> >>>>>> [Motivation,
> >>>>>>>>>>>>>>>>> Examples,
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> Rejected Alternatives sections]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the
> >>>>>> full
> >>>>>>>>>>>>>>>>>> flexibility
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams
> >>>>>>> ones,
> >>>>>>>>>>>>>>>>>> determine
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For
> >>>>>>> example, f
> >>>>>>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user
> may
> >>>>>>>> want
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> raise
> >>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics.
> >>>>>>>> Having
> >>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not
> >>>>>> serve
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> user's
> >>>>>>>>>>>>>>>>>>>>>> needs. See Example 2, please.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public
> Interfaces
> >>>>>>>>>>>> section]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> >>>>>>>>>>>>>>>>>> RecordTooLargeException,
> >>>>>>>>>>>>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>>>>> this record will not be a part of the batch of
> >>>>>>>> transactions
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode.
> >>>>>> So
> >>>>>>> no
> >>>>>>>>>>>>>>>>> worries
> >>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in
> >>>>>>> this
> >>>>>>>>>>>> case,
> >>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> record will never ever be sent to the broker.
> SWALLOW
> >>>>>>>> means
> >>>>>>>>>>>>> drop
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>>>> and continue/swallow the error.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for
> >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException?
> >>>>>>>>>>>>>>>>>>>>>> [Proposed Changes section]
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or
> SWALLOW
> >>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be
> equal
> >>>>>>> to
> >>>>>>>>>>>> FAIL.
> >>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> well documented/informed in javadoc.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws
> an
> >>>>>>>>>>>>> exception?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it
> >>>>>>> throws
> >>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> exception,
> >>>>>>>>>>>>>>>>>>>>>> everything fails.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> This is a PoC PR <
> >>>>>>>>> https://github.com/apache/kafka/pull/15846
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> ONLY
> >>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to
> >>>>>>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to
> this
> >>>>>>> PR
> >>>>>>>>>>>>> LATER.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Looking forward to your feedback again.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> >>>>>>>>>>>> k...@kirktrue.pro>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Comments inline...
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> >>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by
> >>>>>>> broker,
> >>>>>>>>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> consumer. Of course, the
> `ProducerExceptionHandler`
> >>>>>>>>>>>> interface
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> introduced
> >>>>>>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the
> >>>>>>> producer.
> >>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to
> >>>>>> manage
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the
> >>>>>>>> Producer.send()
> >>>>>>>>>>>>>>>>>> method.
> >>>>>>>>>>>>>>>>>>>>>> Please
> >>>>>>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> >>>>>>>>>>>>>>>>> investigated
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> issue
> >>>>>>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the
> concern
> >>>>>>>> about
> >>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> errors as well.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback
> are
> >>>>>>>>> called
> >>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is
> >>>>>> not
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> desired
> >>>>>>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions
> >>>>>>>> beforehand.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for
> >>>>>> when
> >>>>>>>>>>>>>>>>> Callback
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> >>>>>>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I
> >>>>>>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at
> >>>>>> runtime
> >>>>>>> is
> >>>>>>>>>>>>>>>>>>> possible.
> >>>>>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>>> so,
> >>>>>>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because
> maybe
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>>> try,
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am
> not
> >>>>>>>> 100%
> >>>>>>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can
> >>>>>> consider
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> RETRY
> >>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>> FAIL
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to
> >>>>>> consider
> >>>>>>> a
> >>>>>>>>>>>>>>>>>> constant
> >>>>>>>>>>>>>>>>>>>>> number
> >>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other
> >>>>>>> exceptions
> >>>>>>>>> as
> >>>>>>>>>>>>>>>>>> well.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> It’s not presently possible to change the
> >>>>>> configuration
> >>>>>>>> of
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a
> >>>>>>>>>>>>>>>>>> RecordTooLargeException
> >>>>>>>>>>>>>>>>>>>>> once,
> >>>>>>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will
> >>>>>>>> change
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> fact.
> >>>>>>>>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a
> response
> >>>>>> of
> >>>>>>>>>>>> RETRY
> >>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>> “oversized” record.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an
> >>>>>>>> exception?
> >>>>>>>>> I
> >>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must
> be
> >>>>>>>>> exactly
> >>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually
> >>>>>> did
> >>>>>>>> not
> >>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>>>>>> handler.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler
> is
> >>>>>>> the
> >>>>>>>>>>>> right
> >>>>>>>>>>>>>>>>>>>> choice.
> >>>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have
> >>>>>>>>> repercussions,
> >>>>>>>>>>>>>>>>>>>> depending
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> the business logic. A user would have to
> proactively
> >>>>>>>> trawls
> >>>>>>>>>>>>>>>>>> through
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an
> >>>>>>> interface?
> >>>>>>>>> As
> >>>>>>>>>>>>>>>>>>>> explained
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume
> that
> >>>>>>> the
> >>>>>>>>>>>>>>>>> handler
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the
> >>>>>> future.
> >>>>>>>>>>>>>>>>>> Defining a
> >>>>>>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may
> make
> >>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> configuration a
> >>>>>>>>>>>>>>>>>>>>>>> bit
> >>>>>>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more
> >>>>>> flexibility.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is
> >>>>>>> weird
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> limiting.
> >>>>>>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;)
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation
> >>>>>>> section
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> help
> >>>>>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond
> a)
> >>>>>>>>>>>> skipping
> >>>>>>>>>>>>>>>>>>>>>> “oversized”
> >>>>>>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Small change:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response
> for
> >>>>>>>>> brevity
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> simplicity.
> >>>>>>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> The name change sounds good to me.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thanks Alieh!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I thank you all again for your useful
> >>>>>>>>>>>> questions/suggestions.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as
> >>>>>>>> stated
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> >>>>>>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern
> >>>>>> here.
> >>>>>>>> It
> >>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a
> >>>>>> generic
> >>>>>>>>>>>>>>>>> handler.
> >>>>>>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific
> >>>>>>> errors
> >>>>>>>> we
> >>>>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> handle,
> >>>>>>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic
> >>>>>>> thing
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> two
> >>>>>>>>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>>>>>>> errors.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns
> to
> >>>>>>>> solve
> >>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>> problems. I
> >>>>>>>>>>>>>>>>>>>>>>>>> agree though that we will need something more
> than
> >>>>>>> the
> >>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>>>> classes
> >>>>>>>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling
> be
> >>>>>>>>>>>>>>>>>> configurable.
> >>>>>>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a
> handler
> >>>>>>>> means
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is
> >>>>>>> still
> >>>>>>>>>>>>>>>>>> unclear
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could
> >>>>>>>> include
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>> example?
> >>>>>>>>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind
> and
> >>>>>>>> maybe
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> can
> >>>>
> >>>>
> >>>>
> >>>
>

Reply via email to