Hi Matthias,

I like this compromise a lot. +1 from me

Best,

Chris

On Mon, Jun 17, 2024, 16:15 Justine Olshan <jols...@confluent.io.invalid>
wrote:

> Makes sense to me.
>
> Thanks Matthias for summarizing the state.
>
> Justine
>
> On Mon, Jun 17, 2024 at 1:12 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Hey,
> >
> > seems this KIP is very difficult, and we actually had a lot of
> > background discussion about it in the last weeks. I believe the problem
> > with this KIP is, that we have 3 starting points to look at a problem,
> > but these 3 starting points don't align:
> >
> >
> >   1. Producer API: we want a clean API design or developers using the
> > producer directly (clean semantics, not footguns...)
> >
> >   2. Kafka Streams: as a power user of the producer, we want to have
> > advanced capabilities; given how KS works internally, we need a "power
> > API" on he producer
> >
> >   3. Kafka Connect: also a power user of the producer. However, Connect
> > is a framework not a programming API and thus prefers a config based
> > approach
> >
> >
> > I also think we got one idea wrong: let the user code / handler take
> > care of retries. (I guess that's on me, I started with the idea to have
> > a third return code RETRY...) -- the handler has not enough context
> > information, and making this information available leads to a very
> > clumsy interface. (Defeats (1) from above.)
> >
> > I believe, if we would move forward with the handler, we would need to
> > let the producer do retries, and only call the handler after all
> > retries/timeout are exhausted. However, for this to work, we need a
> > producer config for Connect, what basically defeats the purpose for (2)
> > to make it a programmatic solution (it seems somewhat redundant)
> >
> > Also, the idea to make the handler configurable, in hindsight, seems
> > like a poor approach / bad compromise to address (3) w/o sacrificing
> > (2), but is a problem for (1).
> >
> >
> > We also discussed the "missing metadata" case, and actually believe we
> > can address it w/o a public API change. Alieh put up a PR for this
> > already: https://github.com/apache/kafka/pull/16344
> >
> >
> > This leaves us with the "producer error state problem for EOS" but it
> > might be better to solve this differently. Alieh started KIP-1059 for
> > this case now.
> >
> >
> > Thus, it seems we should DISCARD this KIP, and the Connect team can do a
> > follow up KIP to add the producer configs they need for their own
> > situation.
> >
> > Splitting the solutions tailored for the different situations seems to
> > lead to an overall cleaner solution to the problem.
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> >
> >
> > On 5/15/24 12:30 AM, Federico Valeri wrote:
> > > Hello Alieh, thanks for this useful KIP.
> > >
> > > There is a typo in the motivation when you talk about the
> > > UnknownTopicOrPartitionException. It's delivery.timeout.ms, not
> > > deliver.timeout.ms.
> > >
> > > In the past, I did some work to improve and clean the official Kafka
> > > examples, which I think are useful for new Kafka users. I was
> > > wondering if it is worth to improve them in order to show the correct
> > > usage of this new interface. If you agree, maybe we could mention this
> > > in the proposed changes.
> > >
> > >> The accepted responses for RecordTooLargeException are FAIL and
> > SWALLOW. Therefore, RETRY will be interpreted and executed as FAIL.
> > >
> > > Why do we need this javadoc note? I think it's not possible to return
> > > RETRY in the current form.
> > >
> > > When we talk about swallowing in the default implementation, I think
> > > we will log an error/warning and drop the record right? If yes, should
> > > we clarify this and improve the DROP_INVALID_LARGE_RECORDS_DOC by
> > > mentioning the logging part?
> > >
> > > Should we mention somewhere which logic takes precedence when both the
> > > interface and configs are used?
> > >
> > > On Tue, May 14, 2024 at 4:45 PM Chris Egerton <chr...@aiven.io.invalid
> >
> > wrote:
> > >>
> > >> Hi Alieh,
> > >>
> > >> Thank you for all the updates! One final question--how will the retry
> > >> timeout for unknown topic partition errors be implemented? I think it
> > would
> > >> be best if this could be done with an implementation of the error
> > handler,
> > >> but I don't see a way to track the necessary information with the
> > >> current ProducerExceptionHandler interface.
> > >>
> > >> Cheers,
> > >>
> > >> Chris
> > >>
> > >> On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi
> > <asae...@confluent.io.invalid>
> > >> wrote:
> > >>
> > >>> Thanks Andrew. Done :)
> > >>>
> > >>> @Chris: I changed the config parameter type from boolean to integer,
> > which
> > >>> defines the timeout for retrying. I thought reusing `max.block.ms`
> > was not
> > >>> reasonable as you mentioned.
> > >>>
> > >>> So if the KIP looks good, let 's skip to the good part ;-) VOTING :)
> > >>>
> > >>> Bests,
> > >>> Alieh
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <
> > >>> andrew_schofi...@live.com>
> > >>> wrote:
> > >>>
> > >>>> Hi Alieh,
> > >>>> Just one final comment.
> > >>>>
> > >>>> [AJS5] Existing classes use Retriable, not Retryable. For example:
> > >>>>
> > >>>>
> > >>>
> >
> https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html
> > >>>>
> > >>>> I suggest RetriableResponse and NonRetriableResponse.
> > >>>>
> > >>>> Thanks,
> > >>>> Andrew
> > >>>>
> > >>>>> On 13 May 2024, at 23:17, Alieh Saeedi
> <asae...@confluent.io.INVALID
> > >
> > >>>> wrote:
> > >>>>>
> > >>>>> Hi all,
> > >>>>>
> > >>>>>
> > >>>>> Thanks for all the valid points you listed.
> > >>>>>
> > >>>>>
> > >>>>> KIP updates and addressing concerns:
> > >>>>>
> > >>>>>
> > >>>>> 1) The KIP now suggests two Response types: `RetryableResponse` and
> > >>>>> `NonRetryableResponse`
> > >>>>>
> > >>>>>
> > >>>>> 2) `custom.exception.handler` is changed to
> > >>>> `custom.exception.handler.class`
> > >>>>>
> > >>>>>
> > >>>>> 3) The KIP clarifies that `In the case of an implemented handler
> for
> > >>> the
> > >>>>> specified exception, the handler takes precedence.`
> > >>>>>
> > >>>>>
> > >>>>> 4)  There is now a `default` implementation for both handle()
> > methods.
> > >>>>>
> > >>>>>
> > >>>>> 5)  @Chris: for `UnknownTopicOrPartition`, the default is already
> > >>>> retrying
> > >>>>> for 60s. (In fact, the default value of `max.block.ms`). If the
> > >>> handler
> > >>>>> instructs to FAIL or SWALLOW, there will be no retry, and if the
> > >>> handler
> > >>>>> instructs to RETRY, that will be the default behavior, which
> follows
> > >>> the
> > >>>>> values in already existing config parameters such as `max.block.ms
> `.
> > >>>> Does
> > >>>>> that make sense?
> > >>>>>
> > >>>>>
> > >>>>> Hope the changes and explanations are convincing :)
> > >>>>>
> > >>>>>
> > >>>>> Cheers,
> > >>>>>
> > >>>>> Alieh
> > >>>>>
> > >>>>> On Mon, May 13, 2024 at 6:40 PM Justine Olshan
> > >>>> <jols...@confluent.io.invalid>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Oh I see. The type isn't the error type but a newly defined type
> for
> > >>> the
> > >>>>>> response. Makes sense and works for me.
> > >>>>>>
> > >>>>>> Justine
> > >>>>>>
> > >>>>>> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <
> > >>> fearthecel...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> If we have dedicated methods for each kind of exception
> > >>>>>>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.),
> > doesn't
> > >>>> that
> > >>>>>>> provide sufficient constraint? I'm not suggesting we eliminate
> > these
> > >>>>>>> methods, just that we change their return types to something more
> > >>>>>> flexible.
> > >>>>>>>
> > >>>>>>> On Mon, May 13, 2024, 12:07 Justine Olshan
> > >>>> <jols...@confluent.io.invalid
> > >>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> I'm not sure I agree with the Retriable and NonRetriableResponse
> > >>>>>> comment.
> > >>>>>>>> This doesn't limit the blast radius or enforce certain errors
> are
> > >>>> used.
> > >>>>>>>> I think we might disagree on how controlled these interfaces can
> > >>> be...
> > >>>>>>>>
> > >>>>>>>> Justine
> > >>>>>>>>
> > >>>>>>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton
> > >>> <chr...@aiven.io.invalid
> > >>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Alieh,
> > >>>>>>>>>
> > >>>>>>>>> Thanks for the updates! I just have a few more thoughts:
> > >>>>>>>>>
> > >>>>>>>>> - I don't think a boolean property is sufficient to dictate
> > retries
> > >>>>>> for
> > >>>>>>>>> unknown topic partitions, though. These errors can occur if a
> > topic
> > >>>>>> has
> > >>>>>>>>> just been created, which can occur if, for example, automatic
> > topic
> > >>>>>>>>> creation is enabled for a multi-task connector. This is why I
> > >>>>>> proposed
> > >>>>>>> a
> > >>>>>>>>> timeout instead of a boolean (and see my previous email for why
> > >>>>>>> reducing
> > >>>>>>>>> max.block.ms for a producer is not a viable alternative). If
> it
> > >>>>>> helps,
> > >>>>>>>> one
> > >>>>>>>>> way to reproduce this yourself is to add the line
> > >>>>>>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test
> > >>> here:
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
> > >>>>>>>>> and then check the logs afterward for messages like "Error
> while
> > >>>>>>> fetching
> > >>>>>>>>> metadata with correlation id <n> :
> > >>>>>>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".
> > >>>>>>>>>
> > >>>>>>>>> - I also don't think we need custom XxxResponse enums for every
> > >>>>>>> possible
> > >>>>>>>>> method; it seems like this will lead to a lot of duplication
> and
> > >>>>>>>> cognitive
> > >>>>>>>>> overhead if we want to expand the error handler in the future.
> > >>>>>>> Something
> > >>>>>>>>> more flexible like RetriableResponse and NonRetriableResponse
> > could
> > >>>>>>>>> suffice.
> > >>>>>>>>>
> > >>>>>>>>> - Finally, the KIP still doesn't state how the handler will or
> > >>> won't
> > >>>>>>> take
> > >>>>>>>>> precedence over existing retry properties. If I set `retries`
> or
> > `
> > >>>>>>>>> delivery.timeout.ms` or `max.block.ms` to low values, will
> that
> > >>>>>> cause
> > >>>>>>>>> retries to cease even if my custom handler would otherwise keep
> > >>>>>>> returning
> > >>>>>>>>> RETRY for an error?
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>>
> > >>>>>>>>> Chris
> > >>>>>>>>>
> > >>>>>>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <
> > >>>>>>>>> andrew_schofi...@live.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Alieh,
> > >>>>>>>>>> Just a few more comments on the KIP. It is looking much less
> > risky
> > >>>>>>> now
> > >>>>>>>>> the
> > >>>>>>>>>> scope
> > >>>>>>>>>> is tighter.
> > >>>>>>>>>>
> > >>>>>>>>>> [AJS1] It would be nice to have default implementations of the
> > >>>>>> handle
> > >>>>>>>>>> methods
> > >>>>>>>>>> so an implementor would not need to implement both themselves.
> > >>>>>>>>>>
> > >>>>>>>>>> [AJS2] Producer configurations which are class names usually
> end
> > >>> in
> > >>>>>>>>>> “.class”.
> > >>>>>>>>>> I suggest “custom.exception.handler.class”.
> > >>>>>>>>>>
> > >>>>>>>>>> [AJS3] If I implemented a handler, and I set a non-default
> value
> > >>>>>> for
> > >>>>>>>> one
> > >>>>>>>>>> of the
> > >>>>>>>>>> new configuations, what happens? I would expect that the
> handler
> > >>>>>>> takes
> > >>>>>>>>>> precedence. I wasn’t quite clear what “the control will follow
> > the
> > >>>>>>>>> handler
> > >>>>>>>>>> instructions” meant.
> > >>>>>>>>>>
> > >>>>>>>>>> [AJS4] Because you now have an enum for the
> > >>>>>>>>>> RecordTooLargeExceptionResponse,
> > >>>>>>>>>> I don’t think you need to state in the comment for
> > >>>>>>>>>> ProducerExceptionHandler that
> > >>>>>>>>>> RETRY will be interpreted as FAIL.
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks,
> > >>>>>>>>>> Andrew
> > >>>>>>>>>>
> > >>>>>>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi
> > >>>>>>> <asae...@confluent.io.INVALID
> > >>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hi all,
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thanks for the very interesting discussion during my PTO.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> KIP updates and addressing concerns:
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 1) Two handle() methods are defined in
> ProducerExceptionHandler
> > >>>>>> for
> > >>>>>>>> the
> > >>>>>>>>>> two
> > >>>>>>>>>>> exceptions with different input parameters so that we have
> > >>>>>>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and
> > >>>>>>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord
> > record)
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> 3) The KIP suggests having two more configuration parameters
> > with
> > >>>>>>>>> boolean
> > >>>>>>>>>>> values:
> > >>>>>>>>>>>
> > >>>>>>>>>>> - `drop.invalid.large.records` with a default value of
> `false`
> > >>>>>> for
> > >>>>>>>>>>> swallowing too large records.
> > >>>>>>>>>>>
> > >>>>>>>>>>> - `retry.unknown.topic.partition` with a default value of
> > `true`
> > >>>>>>> that
> > >>>>>>>>>>> performs RETRY for `max.block.ms` ms, encountering the
> > >>>>>>>>>>> UnknownTopicOrPartitionException.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Hope the main concerns are addressed so that we can go
> forward
> > >>>>>> with
> > >>>>>>>>>> voting.
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Alieh
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> > >>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hi Mathias,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> [AL1] While I see the point, I would think having a
> different
> > >>>>>>>>> callback
> > >>>>>>>>>>>> for every exception might not really be elegant?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I'm not sure how to assess the level of elegance of the
> > >>>>>> proposal,
> > >>>>>>>> but
> > >>>>>>>>> I
> > >>>>>>>>>> can
> > >>>>>>>>>>>> comment on the technical characteristics:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1. Having specific interfaces that codify the logic that is
> > >>>>>>>> currently
> > >>>>>>>>>>>> prescribed in the comments reduce the chance of making a
> > >>>>>> mistake.
> > >>>>>>>>>>>> Commments may get ignored, misuderstood or etc. but if the
> > >>>>>>> contract
> > >>>>>>>> is
> > >>>>>>>>>>>> codified, the compilier will help to enforce the contract.
> > >>>>>>>>>>>> 2. Given that the logic is trickier than it seems (the
> > >>>>>>>>> record-too-large
> > >>>>>>>>>> is
> > >>>>>>>>>>>> an example that can easily confuse someone who's not
> > intimately
> > >>>>>>>>> familiar
> > >>>>>>>>>>>> with the nuances of the batching logic), having a little
> more
> > >>>>>>> hoops
> > >>>>>>>> to
> > >>>>>>>>>> jump
> > >>>>>>>>>>>> would give a greater chance that whoever tries to add a new
> > >>>>>> cases
> > >>>>>>>>> pauses
> > >>>>>>>>>>>> and thinks a bit more.
> > >>>>>>>>>>>> 3. As Justine pointed out, having different method will be a
> > >>>>>>> forcing
> > >>>>>>>>>>>> function to go through a KIP rather than smuggle new cases
> > >>>>>> through
> > >>>>>>>>>>>> implementation.
> > >>>>>>>>>>>> 4. Sort of a consequence of the previous 3 -- all those
> things
> > >>>>>>>> reduce
> > >>>>>>>>>> the
> > >>>>>>>>>>>> chance of someone writing the code that works with 2 errors
> > and
> > >>>>>>> then
> > >>>>>>>>>> when
> > >>>>>>>>>>>> more errors are added in the future will suddenly
> incorrectly
> > >>>>>>> ignore
> > >>>>>>>>> new
> > >>>>>>>>>>>> errors (the example I gave in the previous email).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> > extend
> > >>>>>>> as
> > >>>>>>>>>>>> business logic. If a user puts a bad filter condition in
> their
> > >>>>>> KS
> > >>>>>>>> app,
> > >>>>>>>>>> and
> > >>>>>>>>>>>> drops messages
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I agree that there is always a chance to get a bug and lose
> > >>>>>>>> messages,
> > >>>>>>>>>> but
> > >>>>>>>>>>>> there are generally separation of concerns that has
> different
> > >>>>>> risk
> > >>>>>>>>>> profile:
> > >>>>>>>>>>>> the filtering logic may be more rigorously tested and rarely
> > >>>>>>> changed
> > >>>>>>>>>> (say
> > >>>>>>>>>>>> an application developer does it), but setting the topics to
> > >>>>>>> produce
> > >>>>>>>>>> may be
> > >>>>>>>>>>>> done via configuration (e.g. a user of the application does
> > it)
> > >>>>>>> and
> > >>>>>>>>> it's
> > >>>>>>>>>>>> generally an expectation that users would get an error when
> > >>>>>>>>>> configuration
> > >>>>>>>>>>>> is incorrect.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> What could be worse is that UnknownTopicOrPartitionException
> > can
> > >>>>>>> be
> > >>>>>>>> an
> > >>>>>>>>>>>> intermittent error, i.e. with a generally correct
> > configuration,
> > >>>>>>>> there
> > >>>>>>>>>>>> could be metadata propagation problem on the cluster and
> then
> > a
> > >>>>>>>> random
> > >>>>>>>>>> set
> > >>>>>>>>>>>> of records could get lost.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me,
> > >>>>>>>> checking
> > >>>>>>>>>> the
> > >>>>>>>>>>>> size of the record upfront is exactly what the KIP proposes?
> > No?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> It achieves the same result but solves it differently, my
> > >>>>>>> proposal:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1. Application checks the validity of a record (maybe via a
> > new
> > >>>>>>>>>>>> validateRecord method) before producing it, and can just
> > exclude
> > >>>>>>> it
> > >>>>>>>> or
> > >>>>>>>>>>>> return an error to the user.
> > >>>>>>>>>>>> 2. Application produces the record -- at this point there
> are
> > no
> > >>>>>>>>> records
> > >>>>>>>>>>>> that could return record too large, they were either skipped
> > at
> > >>>>>>>> step 1
> > >>>>>>>>>> or
> > >>>>>>>>>>>> we didn't get here because step 1 failed.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Vs. KIP's proposal
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 1. Application produces the record.
> > >>>>>>>>>>>> 2. Application gets a callback.
> > >>>>>>>>>>>> 3. Application returns the action on how to proceed.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The advantage of the former is the clarity of semantics --
> the
> > >>>>>>>> record
> > >>>>>>>>> is
> > >>>>>>>>>>>> invalid (property of the record, not a function of server
> > state
> > >>>>>> or
> > >>>>>>>>>> server
> > >>>>>>>>>>>> configuration) and we can clearly know that it is the record
> > >>>>>> that
> > >>>>>>> is
> > >>>>>>>>> bad
> > >>>>>>>>>>>> and can never succeed.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> The KIP-proposed way actually has a very tricky point: it
> > >>>>>> actually
> > >>>>>>>>>> handles
> > >>>>>>>>>>>> a subset of record-too-large exceptions.  The broker can
> > return
> > >>>>>>>>>>>> record-too-large and reject the whole batch (but we don't
> want
> > >>>>>> to
> > >>>>>>>>> ignore
> > >>>>>>>>>>>> those because then we can skip random records that just
> > happened
> > >>>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>> in
> > >>>>>>>>>>>> the same batch), in some sense we use the same error for 2
> > >>>>>>> different
> > >>>>>>>>>>>> conditions and understanding that requires pretty deep
> > >>>>>>> understanding
> > >>>>>>>>> of
> > >>>>>>>>>>>> Kafka internals.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> > >>>>>>>>>> <jols...@confluent.io.invalid
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> My concern with respect to it being fragile: the code that
> > >>>>>>> ensures
> > >>>>>>>>> the
> > >>>>>>>>>>>>> error type is internal to the producer. Someone may see it
> > and
> > >>>>>>>> say, I
> > >>>>>>>>>>>> want
> > >>>>>>>>>>>>> to add such and such error. This looks like internal code,
> > so I
> > >>>>>>>> don't
> > >>>>>>>>>>>> need
> > >>>>>>>>>>>>> a KIP, and then they can change it to whatever they want
> > >>>>>> thinking
> > >>>>>>>> it
> > >>>>>>>>> is
> > >>>>>>>>>>>>> within the typical kafka improvement protocol.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Relying on an internal change to enforce an external API is
> > >>>>>>> fragile
> > >>>>>>>>> in
> > >>>>>>>>>> my
> > >>>>>>>>>>>>> opinion. That's why I sort of agreed with Artem with
> > enforcing
> > >>>>>>> the
> > >>>>>>>>>> error
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>> the method signature -- part of the public API.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Chris's comments on requiring more information to handler
> > again
> > >>>>>>>> makes
> > >>>>>>>>>> me
> > >>>>>>>>>>>>> wonder if we are solving a problem of lack of information
> at
> > >>>>>> the
> > >>>>>>>>>>>>> application level with a more powerful solution than we
> need.
> > >>>>>>> (Ie,
> > >>>>>>>> if
> > >>>>>>>>>> we
> > >>>>>>>>>>>>> had more information, could the application close and
> restart
> > >>>>>> the
> > >>>>>>>>>>>>> transaction rather than having to drop records) But I am
> > happy
> > >>>>>> to
> > >>>>>>>>>>>>> compromise with a handler that we can agree is sufficiently
> > >>>>>>>>> controlled
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>> documented.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton
> > >>>>>>>> <chr...@aiven.io.invalid
> > >>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Continuing prior discussions:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching
> > >>>>>> point
> > >>>>>>> is
> > >>>>>>>>>>>> that I
> > >>>>>>>>>>>>>> don't see the point in allowing for this kind of pluggable
> > >>>>>> logic
> > >>>>>>>>>>>> without
> > >>>>>>>>>>>>>> also covering more scenarios. Take example 2 in the KIP:
> if
> > >>>>>>> we're
> > >>>>>>>>>> going
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> implement retries only on "important" topics when a topic
> > >>>>>>>> partition
> > >>>>>>>>>>>> isn't
> > >>>>>>>>>>>>>> found, why wouldn't we also want to be able to do this for
> > >>>>>> other
> > >>>>>>>>>>>> errors?
> > >>>>>>>>>>>>>> Again, taking authorization errors as an example, why
> > wouldn't
> > >>>>>>> we
> > >>>>>>>>> want
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>> be able to fail when we can't write to "important" topics
> > >>>>>>> because
> > >>>>>>>>> the
> > >>>>>>>>>>>>>> producer principal lacks sufficient ACLs, and drop the
> > record
> > >>>>>> if
> > >>>>>>>> the
> > >>>>>>>>>>>>> topic
> > >>>>>>>>>>>>>> isn't "important"? In a security-conscious environment
> with
> > >>>>>>>>>>>>>> runtime-dependent topic routing (which is a common feature
> > of
> > >>>>>>> many
> > >>>>>>>>>>>> source
> > >>>>>>>>>>>>>> connectors, such as the Debezium connectors), this seems
> > >>>>>> fairly
> > >>>>>>>>>> likely.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 2) As far as changing the shape of the API goes, I like
> > >>>>>> Artem's
> > >>>>>>>> idea
> > >>>>>>>>>> of
> > >>>>>>>>>>>>>> splitting out the interface based on specific exceptions.
> > This
> > >>>>>>> may
> > >>>>>>>>> be
> > >>>>>>>>>> a
> > >>>>>>>>>>>>>> little laborious to expand in the future, but if we really
> > >>>>>> want
> > >>>>>>> to
> > >>>>>>>>>>>>>> limit the exceptions that we cover with the handler and
> move
> > >>>>>>>> slowly
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in
> > the
> > >>>>>>>>>>>>> interface. I
> > >>>>>>>>>>>>>> also acknowledge that there's no way to completely prevent
> > >>>>>>> people
> > >>>>>>>>> from
> > >>>>>>>>>>>>>> shooting themselves in the foot by implementing the API
> > >>>>>>>> incorrectly,
> > >>>>>>>>>>>> but
> > >>>>>>>>>>>>> I
> > >>>>>>>>>>>>>> think it's worth it to do what we can--including
> leveraging
> > >>>>>> the
> > >>>>>>>> Java
> > >>>>>>>>>>>>>> language's type system--to help them, so IMO there's value
> > to
> > >>>>>>>>>>>> eliminating
> > >>>>>>>>>>>>>> the implicit behavior of failing when a policy returns
> RETRY
> > >>>>>>> for a
> > >>>>>>>>>>>>>> non-retriable error. This can take a variety of shapes and
> > I'm
> > >>>>>>> not
> > >>>>>>>>>>>> going
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> insist on anything specific, but I do want to again raise
> my
> > >>>>>>>>> concerns
> > >>>>>>>>>>>>> with
> > >>>>>>>>>>>>>> the current proposal and request that we find something a
> > >>>>>> little
> > >>>>>>>>>>>> better.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 3) Concerning the default implementation--actually, I
> meant
> > >>>>>>> what I
> > >>>>>>>>>>>> wrote
> > >>>>>>>>>>>>> :)
> > >>>>>>>>>>>>>> I don't want a "second" default, I want an implementation
> of
> > >>>>>>> this
> > >>>>>>>>>>>>> interface
> > >>>>>>>>>>>>>> to be used as the default if no others are specified. The
> > >>>>>>> behavior
> > >>>>>>>>> of
> > >>>>>>>>>>>>> this
> > >>>>>>>>>>>>>> default implementation would be identical to existing
> > behavior
> > >>>>>>> (so
> > >>>>>>>>>>>> there
> > >>>>>>>>>>>>>> would be no backwards compatibility concerns like the ones
> > >>>>>>> raised
> > >>>>>>>> by
> > >>>>>>>>>>>>>> Matthias), but it would be possible to configure this
> > default
> > >>>>>>>>> handler
> > >>>>>>>>>>>>> class
> > >>>>>>>>>>>>>> to behave differently for a basic set of scenarios. This
> > would
> > >>>>>>>>> mirror
> > >>>>>>>>>>>>> (pun
> > >>>>>>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and
> > its
> > >>>>>>>>>>>>>> ReplicationPolicy interface [1]. There is a default
> > >>>>>>> implementation
> > >>>>>>>>>>>>>> available [2] that recognizes a handful of basic
> > configuration
> > >>>>>>>>>>>> properties
> > >>>>>>>>>>>>>> [3] for simple tweaks, but if users want, they can also
> > >>>>>>> implement
> > >>>>>>>>>> their
> > >>>>>>>>>>>>> own
> > >>>>>>>>>>>>>> replication policy for more fine-grained logic if those
> > >>>>>>> properties
> > >>>>>>>>>>>> aren't
> > >>>>>>>>>>>>>> flexible enough.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> More concretely, I'm imagining something like this for the
> > >>>>>>>> producer
> > >>>>>>>>>>>>>> exception handler:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> - Default implementation class
> > >>>>>>>>>>>>>> of
> > >>>>>>>>
> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> > >>>>>>>>>>>>>> - This class would recognize two properties:
> > >>>>>>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults
> to
> > >>>>>>>> false.
> > >>>>>>>>> If
> > >>>>>>>>>>>>>> "false", then causes the handler to return FAIL whenever
> > >>>>>>>>>>>>>> a RecordTooLargeException is encountered; if "true", then
> > >>>>>> causes
> > >>>>>>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead
> > >>>>>>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer
> > >>>>>> property,
> > >>>>>>>>>>>> defaults
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is
> > >>>>>>>>> encountered,
> > >>>>>>>>>>>>>> causes the handler to return FAIL if that record has been
> > >>>>>>> pending
> > >>>>>>>>> for
> > >>>>>>>>>>>>> more
> > >>>>>>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be
> > returned
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think this is worth addressing now instead of later
> > because
> > >>>>>> it
> > >>>>>>>>>> forces
> > >>>>>>>>>>>>> us
> > >>>>>>>>>>>>>> to evaluate the usefulness of this interface and it
> > addresses
> > >>>>>> a
> > >>>>>>>>>>>>>> long-standing issue not just with Kafka Connect, but with
> > the
> > >>>>>>> Java
> > >>>>>>>>>>>>> producer
> > >>>>>>>>>>>>>> in general. For reference, here are a few tickets I
> > collected
> > >>>>>>>> after
> > >>>>>>>>>>>>> briefly
> > >>>>>>>>>>>>>> skimming our Jira showing that this is a real pain point
> for
> > >>>>>>>> users:
> > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634.
> Although
> > >>>>>>> this
> > >>>>>>>> is
> > >>>>>>>>>>>>>> frequently reported with Kafka Connect, it applies to
> anyone
> > >>>>>> who
> > >>>>>>>>>>>>> configures
> > >>>>>>>>>>>>>> a producer to use a high retry timeout. I am aware of the
> > >>>>>>>>>> max.block.ms
> > >>>>>>>>>>>>>> property, but it's painful and IMO poor behavior to
> require
> > >>>>>>> users
> > >>>>>>>> to
> > >>>>>>>>>>>>> reduce
> > >>>>>>>>>>>>>> the value of this property just to handle the single
> > scenario
> > >>>>>>> when
> > >>>>>>>>>>>> trying
> > >>>>>>>>>>>>>> to write to topics that don't exist, since it would also
> > limit
> > >>>>>>> the
> > >>>>>>>>>>>> retry
> > >>>>>>>>>>>>>> timeout for other operations that are legitimately
> > retriable.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Raising new points:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 5) I don't see the interplay between this handler and
> > existing
> > >>>>>>>>>>>>>> retry-related properties mentioned anywhere in the KIP.
> I'm
> > >>>>>>>> assuming
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>> properties like "retries", "max.block.ms", and "
> > >>>>>>>> delivery.timeout.ms
> > >>>>>>>>> "
> > >>>>>>>>>>>>> would
> > >>>>>>>>>>>>>> take precedence over the handler and once they are
> > exhausted,
> > >>>>>>> the
> > >>>>>>>>>>>>>> record/batch will fail no matter what? If so, it's
> probably
> > >>>>>>> worth
> > >>>>>>>>>>>> briefly
> > >>>>>>>>>>>>>> mentioning this (no more than a sentence or two) in the
> KIP,
> > >>>>>> and
> > >>>>>>>> if
> > >>>>>>>>>>>> not,
> > >>>>>>>>>>>>>> I'm curious what you have in mind.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 6) I also wonder if the API provides enough information in
> > its
> > >>>>>>>>> current
> > >>>>>>>>>>>>>> form. Would it be possible to provide handlers with some
> way
> > >>>>>> of
> > >>>>>>>>>>>> tracking
> > >>>>>>>>>>>>>> how long a record has been pending for (i.e., how long
> it's
> > >>>>>> been
> > >>>>>>>>> since
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>> record was provided as an argument to Producer::send)? One
> > way
> > >>>>>>> to
> > >>>>>>>> do
> > >>>>>>>>>>>> this
> > >>>>>>>>>>>>>> could be to add a method like
> `onNewRecord(ProducerRecord)`
> > >>>>>> and
> > >>>>>>>>>>>>>> allow/require the handler to do its own bookkeeping,
> > probably
> > >>>>>>>> with a
> > >>>>>>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that
> > the
> > >>>>>>>>> handler
> > >>>>>>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying
> to
> > >>>>>>> track
> > >>>>>>>>>>>>> records.
> > >>>>>>>>>>>>>> An alternative could be to include information about the
> > >>>>>> initial
> > >>>>>>>>> time
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>> record was received by the producer and the number of
> > retries
> > >>>>>>> that
> > >>>>>>>>>> have
> > >>>>>>>>>>>>>> been performed for it as parameters in the handle
> method(s),
> > >>>>>> but
> > >>>>>>>> I'm
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>> sure how easy this would be to implement and it might
> > clutter
> > >>>>>>>> things
> > >>>>>>>>>>>> up a
> > >>>>>>>>>>>>>> bit too much.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 7) A small request--can we add Closeable (or, if you
> prefer,
> > >>>>>>>>>>>>> AutoCloseable)
> > >>>>>>>>>>>>>> as a superinterface for the handler interface?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> [1] -
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> > >>>>>>>>>>>>>> [2] -
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> > >>>>>>>>>>>>>> [3] -
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> > >>>>>>>>>>>>>> ,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Chris
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <
> > >>>>>>> mj...@apache.org
> > >>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Very interesting discussion.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Seems a central point is the question about "how generic"
> > we
> > >>>>>>>>> approach
> > >>>>>>>>>>>>>>> this, and some people think we need to be conservative
> and
> > >>>>>>> others
> > >>>>>>>>>>>> think
> > >>>>>>>>>>>>>>> we should try to be as generic as possible.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Personally, I think following a limited scope for this
> KIP
> > >>>>>> (by
> > >>>>>>>>>>>>>>> explicitly saying we only cover RecordTooLarge and
> > >>>>>>>>>>>>>>> UnknownTopicOrPartition) might be better. We have
> concrete
> > >>>>>>>> tickets
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> we address, while for other exception (like
> authorization)
> > we
> > >>>>>>>> don't
> > >>>>>>>>>>>>> know
> > >>>>>>>>>>>>>>> if people want to handle it to begin with. Boiling the
> > ocean
> > >>>>>>>> might
> > >>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> get us too far, and being somewhat pragmatic might help
> to
> > >>>>>> move
> > >>>>>>>>> this
> > >>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we
> > want
> > >>>>>> to
> > >>>>>>>> be
> > >>>>>>>>>>>>>>> careful anyway to not allow users to break stuff too
> > easily.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> As the same time, I agree that we should setup this
> change
> > >>>>>> in a
> > >>>>>>>>>>>> forward
> > >>>>>>>>>>>>>>> looking way, and thus having a single generic handler
> > allows
> > >>>>>> us
> > >>>>>>>> to
> > >>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>> extend the handler more easily. This should also simplify
> > >>>>>>> follow
> > >>>>>>>> up
> > >>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>> that might add new error cases (I actually mentioned one
> > more
> > >>>>>>> to
> > >>>>>>>>>>>> Alieh
> > >>>>>>>>>>>>>>> already, but we both agreed that it might be best to
> > exclude
> > >>>>>> it
> > >>>>>>>>> from
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow
> up
> > >>>>>> KIP
> > >>>>>>> is
> > >>>>>>>>> not
> > >>>>>>>>>>>>>>> the end of the world.)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> @Chris:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it
> > >>>>>> actually
> > >>>>>>>>> would
> > >>>>>>>>>>>>> be?
> > >>>>>>>>>>>>>>> If the contract is clearly defined, it seems to be fine
> > what
> > >>>>>>> the
> > >>>>>>>>> KIP
> > >>>>>>>>>>>>>>> proposes, and given that such a handler is an expert API,
> > and
> > >>>>>>> we
> > >>>>>>>>> can
> > >>>>>>>>>>>>>>> provide "best practices" (cf my other comment below in
> > >>>>>> [AL1]),
> > >>>>>>>>> being
> > >>>>>>>>>>>> a
> > >>>>>>>>>>>>>>> little bit pragmatic sound fine to me.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Not sure if I understand Justin's argument on this
> > question?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> (3) About having a default impl or not. I am fine with
> > adding
> > >>>>>>>> one,
> > >>>>>>>>>>>> even
> > >>>>>>>>>>>>>>> if I am not sure why Connect could not just add its own
> one
> > >>>>>> and
> > >>>>>>>>> plug
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>>> in (and we would add corresponding configs for Connect,
> but
> > >>>>>> not
> > >>>>>>>> for
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> producer itself)? For this case, we could also do this
> as a
> > >>>>>>>> follow
> > >>>>>>>>> up
> > >>>>>>>>>>>>>>> KIP, but happy to include it in this KIP to provide value
> > to
> > >>>>>>>>> Connect
> > >>>>>>>>>>>>>>> right away (even if the value might not come right away
> if
> > we
> > >>>>>>>> miss
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we
> > >>>>>> would
> > >>>>>>>> for
> > >>>>>>>>>>>>> sure
> > >>>>>>>>>>>>>>> plugin our own impl, and lock down the config such that
> > users
> > >>>>>>>>> cannot
> > >>>>>>>>>>>>> set
> > >>>>>>>>>>>>>>> their own handler on the internal producer to begin with.
> > >>>>>> Might
> > >>>>>>>> be
> > >>>>>>>>>>>> good
> > >>>>>>>>>>>>>>> to elaborate why the producer should have a default? We
> > might
> > >>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>> want to add this to the KIP right away?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The key for a default impl would be, to not change the
> > >>>>>> current
> > >>>>>>>>>>>>> behavior,
> > >>>>>>>>>>>>>>> and having no default seems to achieve this. For the two
> > >>>>>> cases
> > >>>>>>>> you
> > >>>>>>>>>>>>>>> mentioned, it's unclear to me what default value on
> "upper
> > >>>>>>> bound
> > >>>>>>>> on
> > >>>>>>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should
> set?
> > >>>>>>> Seems
> > >>>>>>>>> it
> > >>>>>>>>>>>>>>> would need to be the same as `delivery.timeout.ms`?
> > However,
> > >>>>>>> if
> > >>>>>>>>>>>> users
> > >>>>>>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would
> > >>>>>> need
> > >>>>>>> to
> > >>>>>>>>> set
> > >>>>>>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we
> > >>>>>>>> hard-code 2
> > >>>>>>>>>>>>>>> minutes, it might not be backward compatible. I have the
> > >>>>>>>> impression
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>> might introduce some undesired coupling? -- For the
> "record
> > >>>>>> too
> > >>>>>>>>>>>> large"
> > >>>>>>>>>>>>>>> case, the config seems to be boolean and setting it to
> > >>>>>> `false`
> > >>>>>>> by
> > >>>>>>>>>>>>>>> default seems to provide backward compatibility.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> @Artem:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [AL1] While I see the point, I would think having a
> > different
> > >>>>>>>>>>>> callback
> > >>>>>>>>>>>>>>> for every exception might not really be elegant? In the
> > end,
> > >>>>>>> the
> > >>>>>>>>>>>>> handler
> > >>>>>>>>>>>>>>> is an very advanced feature anyway, and if it's
> implemented
> > >>>>>> in
> > >>>>>>> a
> > >>>>>>>>> bad
> > >>>>>>>>>>>>>>> way, well, it's a user error -- we cannot protect users
> > from
> > >>>>>>>>>>>>> everything.
> > >>>>>>>>>>>>>>> To me, a handler like this, is to some extend "business
> > >>>>>> logic"
> > >>>>>>>> and
> > >>>>>>>>>>>> if a
> > >>>>>>>>>>>>>>> user gets business logic wrong, it's hard to protect
> them.
> > --
> > >>>>>>> We
> > >>>>>>>>>>>> would
> > >>>>>>>>>>>>>>> of course provide best practice guidance in the JaveDocs,
> > and
> > >>>>>>>>> explain
> > >>>>>>>>>>>>>>> that a handler should have explicit `if` statements for
> > stuff
> > >>>>>>> it
> > >>>>>>>>> want
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>> handle, and only a single default which return FAIL.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application
> > >>>>>> layer.
> > >>>>>>>> Ie,
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> TX is not completed, we clean up and setup out task from
> > >>>>>>> scratch,
> > >>>>>>>>> to
> > >>>>>>>>>>>>>>> ensure the pending transaction is completed before we
> > resume.
> > >>>>>>> If
> > >>>>>>>>> the
> > >>>>>>>>>>>> TX
> > >>>>>>>>>>>>>>> was indeed aborted, we would retry from older offset and
> > thus
> > >>>>>>>> just
> > >>>>>>>>>>>> hit
> > >>>>>>>>>>>>>>> the same error again and the loop begins again.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some
> > >>>>>> extend
> > >>>>>>>> as
> > >>>>>>>>>>>>>>> business logic. If a user puts a bad filter condition in
> > >>>>>> their
> > >>>>>>> KS
> > >>>>>>>>>>>> app,
> > >>>>>>>>>>>>>>> and drops messages, it nothing we can do about it, and
> this
> > >>>>>>>> handler
> > >>>>>>>>>>>>>>> IMHO, has a similar purpose. This is also the line of
> > >>>>>> thinking
> > >>>>>>> I
> > >>>>>>>>>>>> apply
> > >>>>>>>>>>>>>>> to EOS, to address Justin's concern about "should we
> allow
> > to
> > >>>>>>>> drop
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>> EOS", and my answer is "yes", because it's more business
> > >>>>>> logic
> > >>>>>>>> than
> > >>>>>>>>>>>>>>> actual error handling IMHO. And by default, we fail... So
> > >>>>>> users
> > >>>>>>>>>>>> opt-in
> > >>>>>>>>>>>>>>> to add business logic to drop records. It's an
> application
> > >>>>>>> level
> > >>>>>>>>>>>>>>> decision how to write the code.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to
> me,
> > >>>>>>>>> checking
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> size of the record upfront is exactly what the KIP
> > proposes?
> > >>>>>>> No?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> @Justin:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I saw the sample
> > >>>>>>>>>>>>>>>> code -- is it just an if statement checking for the
> error
> > >>>>>>> before
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> What do you mean by fragile? Not sure if I see your
> point.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> > >>>>>>>>>>>>>>>> Hi Alieh,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the KIP.  The motivation talks about very
> > >>>>>> specific
> > >>>>>>>>>>>> cases,
> > >>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>> the interface is generic.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> [AL1]
> > >>>>>>>>>>>>>>>> If the interface evolves in the future I think we could
> > have
> > >>>>>>> the
> > >>>>>>>>>>>>>>> following
> > >>>>>>>>>>>>>>>> confusion:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 1. A user implemented SWALLOW action for both
> > >>>>>>>>>>>> RecordTooLargeException
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException.  For simpicity they
> just
> > >>>>>>>> return
> > >>>>>>>>>>>>>> SWALLOW
> > >>>>>>>>>>>>>>>> from the function, because it elegantly handles all
> known
> > >>>>>>> cases.
> > >>>>>>>>>>>>>>>> 2. The interface has evolved to support a new exception.
> > >>>>>>>>>>>>>>>> 3. The user has upgraded their Kafka client.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Now a new kind of error gets dropped on the floor
> without
> > >>>>>>> user's
> > >>>>>>>>>>>>>>> intention
> > >>>>>>>>>>>>>>>> and it would be super hard to detect and debug.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> To avoid the confusion, I think we should use handlers
> for
> > >>>>>>>>> specific
> > >>>>>>>>>>>>>>>> exceptions.  Then if a new exception is added it won't
> get
> > >>>>>>>>> silently
> > >>>>>>>>>>>>>>> swalled
> > >>>>>>>>>>>>>>>> because the user would need to add new functionality to
> > >>>>>> handle
> > >>>>>>>> it.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I also have some higher level comments:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> [AL2]
> > >>>>>>>>>>>>>>>>> it throws a TimeoutException, and the user can only
> > blindly
> > >>>>>>>>> retry,
> > >>>>>>>>>>>>>> which
> > >>>>>>>>>>>>>>>> may result in an infinite retry loop
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> If the TimeoutException happens during transactional
> > >>>>>>> processing
> > >>>>>>>>>>>>>> (exactly
> > >>>>>>>>>>>>>>>> once is the desired sematnics), then the client should
> not
> > >>>>>>> retry
> > >>>>>>>>>>>> when
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>> gets TimeoutException because without knowing the reason
> > for
> > >>>>>>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the
> > >>>>>> message
> > >>>>>>>> got
> > >>>>>>>>>>>>>>> actually
> > >>>>>>>>>>>>>>>> produced or not and retrying the message may result in
> > >>>>>>>>> duplicatees.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to
> the
> > >>>>>>>>>>>> underlying
> > >>>>>>>>>>>>>> root
> > >>>>>>>>>>>>>>>> cause of missing metadata
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Maybe we should fix the error handling and return the
> > proper
> > >>>>>>>>>>>>> underlying
> > >>>>>>>>>>>>>>>> message?  Then the application can properly handle the
> > >>>>>> message
> > >>>>>>>>>>>> based
> > >>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>> preferences.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>  From the product perspective, it's not clear how safe
> it
> > is
> > >>>>>> to
> > >>>>>>>>>>>>> blindly
> > >>>>>>>>>>>>>>>> ignore UnknownTopicOrPartitionException.  This could
> lead
> > to
> > >>>>>>>>>>>>> situations
> > >>>>>>>>>>>>>>>> when a simple typo could lead to massive data loss (part
> > of
> > >>>>>>> the
> > >>>>>>>>>>>> data
> > >>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>> effectively be produced to a "black hole" and the user
> may
> > >>>>>> not
> > >>>>>>>>>>>> notice
> > >>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>> for a while).
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> In which situations would you recommend the user to
> "black
> > >>>>>>> hole"
> > >>>>>>>>>>>>>> messages
> > >>>>>>>>>>>>>>>> in case of misconfiguration?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> [AL3]
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for
> > >>>>>>>>>>>>> RecordTooLargeException,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Is it my understanding that this KIP proposes that
> > >>>>>>> functionality
> > >>>>>>>>>>>> that
> > >>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that
> > happen
> > >>>>>>>>> because
> > >>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> producer cannot produce the record (if the broker
> rejects
> > >>>>>> the
> > >>>>>>>>>>>> batch,
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> error won't get to the handler, because we cannot know
> > which
> > >>>>>>>> other
> > >>>>>>>>>>>>>>> records
> > >>>>>>>>>>>>>>>> get ignored).  In this case, why not just check the
> > locally
> > >>>>>>>>>>>>> configured
> > >>>>>>>>>>>>>>> max
> > >>>>>>>>>>>>>>>> record size upfront and not produce the recrord in the
> > first
> > >>>>>>>>> place?
> > >>>>>>>>>>>>>>> Maybe
> > >>>>>>>>>>>>>>>> we can expose a validation function from the producer
> that
> > >>>>>>> could
> > >>>>>>>>>>>>>> validate
> > >>>>>>>>>>>>>>>> the records locally, so we don't need to produce the
> > record
> > >>>>>> in
> > >>>>>>>>>>>> order
> > >>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> know that it's invalid.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> -Artem
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> > >>>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Alieh and Chris,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I
> > guess
> > >>>>>> I
> > >>>>>>>> just
> > >>>>>>>>>>>>>> didn't
> > >>>>>>>>>>>>>>>>> understand how that would be ensured on the producer
> > side.
> > >>>>>> I
> > >>>>>>>> saw
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> sample
> > >>>>>>>>>>>>>>>>> code -- is it just an if statement checking for the
> error
> > >>>>>>>> before
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Can you clarify what you mean by `since the code does
> not
> > >>>>>>> reach
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> KS
> > >>>>>>>>>>>>>>>>> interface and breaks somewhere in producer.` If we
> > surfaced
> > >>>>>>>> this
> > >>>>>>>>>>>>> error
> > >>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> the application in a better way would that also be a
> > >>>>>> solution
> > >>>>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> issue?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Justine
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> > >>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>

Reply via email to