Hi Alieh,

Thanks for the updates! I just have a few more thoughts:

- I don't think a boolean property is sufficient to dictate retries for
unknown topic partitions, though. These errors can occur if a topic has
just been created, which can occur if, for example, automatic topic
creation is enabled for a multi-task connector. This is why I proposed a
timeout instead of a boolean (and see my previous email for why reducing
max.block.ms for a producer is not a viable alternative). If it helps, one
way to reproduce this yourself is to add the line
`fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here:
https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134
and then check the logs afterward for messages like "Error while fetching
metadata with correlation id <n> : {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}".

- I also don't think we need custom XxxResponse enums for every possible
method; it seems like this will lead to a lot of duplication and cognitive
overhead if we want to expand the error handler in the future. Something
more flexible like RetriableResponse and NonRetriableResponse could suffice.

- Finally, the KIP still doesn't state how the handler will or won't take
precedence over existing retry properties. If I set `retries` or `
delivery.timeout.ms` or `max.block.ms` to low values, will that cause
retries to cease even if my custom handler would otherwise keep returning
RETRY for an error?

Cheers,

Chris

On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <andrew_schofi...@live.com>
wrote:

> Hi Alieh,
> Just a few more comments on the KIP. It is looking much less risky now the
> scope
> is tighter.
>
> [AJS1] It would be nice to have default implementations of the handle
> methods
> so an implementor would not need to implement both themselves.
>
> [AJS2] Producer configurations which are class names usually end in
> “.class”.
> I suggest “custom.exception.handler.class”.
>
> [AJS3] If I implemented a handler, and I set a non-default value for one
> of the
> new configuations, what happens? I would expect that the handler takes
> precedence. I wasn’t quite clear what “the control will follow the handler
> instructions” meant.
>
> [AJS4] Because you now have an enum for the
> RecordTooLargeExceptionResponse,
> I don’t think you need to state in the comment for
> ProducerExceptionHandler that
> RETRY will be interpreted as FAIL.
>
> Thanks,
> Andrew
>
> > On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID>
> wrote:
> >
> > Hi all,
> >
> >
> > Thanks for the very interesting discussion during my PTO.
> >
> >
> > KIP updates and addressing concerns:
> >
> >
> > 1) Two handle() methods are defined in ProducerExceptionHandler for the
> two
> > exceptions with different input parameters so that we have
> > handle(RecordTooLargeException e, ProducerRecord record) and
> > handle(UnknownTopicOrPartitionException e, ProducerRecord record)
> >
> >
> > 2) The ProducerExceptionHandler extends `Closable` as well.
> >
> >
> > 3) The KIP suggests having two more configuration parameters with boolean
> > values:
> >
> > - `drop.invalid.large.records` with a default value of `false` for
> > swallowing too large records.
> >
> > - `retry.unknown.topic.partition` with a default value of `true` that
> > performs RETRY for `max.block.ms` ms, encountering the
> > UnknownTopicOrPartitionException.
> >
> >
> > Hope the main concerns are addressed so that we can go forward with
> voting.
> >
> >
> > Cheers,
> >
> > Alieh
> >
> > On Thu, May 9, 2024 at 11:25 PM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> >> Hi Mathias,
> >>
> >>> [AL1] While I see the point, I would think having a different callback
> >> for every exception might not really be elegant?
> >>
> >> I'm not sure how to assess the level of elegance of the proposal, but I
> can
> >> comment on the technical characteristics:
> >>
> >> 1. Having specific interfaces that codify the logic that is currently
> >> prescribed in the comments reduce the chance of making a mistake.
> >> Commments may get ignored, misuderstood or etc. but if the contract is
> >> codified, the compilier will help to enforce the contract.
> >> 2. Given that the logic is trickier than it seems (the record-too-large
> is
> >> an example that can easily confuse someone who's not intimately familiar
> >> with the nuances of the batching logic), having a little more hoops to
> jump
> >> would give a greater chance that whoever tries to add a new cases pauses
> >> and thinks a bit more.
> >> 3. As Justine pointed out, having different method will be a forcing
> >> function to go through a KIP rather than smuggle new cases through
> >> implementation.
> >> 4. Sort of a consequence of the previous 3 -- all those things reduce
> the
> >> chance of someone writing the code that works with 2 errors and then
> when
> >> more errors are added in the future will suddenly incorrectly ignore new
> >> errors (the example I gave in the previous email).
> >>
> >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
> >> business logic. If a user puts a bad filter condition in their KS app,
> and
> >> drops messages
> >>
> >> I agree that there is always a chance to get a bug and lose messages,
> but
> >> there are generally separation of concerns that has different risk
> profile:
> >> the filtering logic may be more rigorously tested and rarely changed
> (say
> >> an application developer does it), but setting the topics to produce
> may be
> >> done via configuration (e.g. a user of the application does it) and it's
> >> generally an expectation that users would get an error when
> configuration
> >> is incorrect.
> >>
> >> What could be worse is that UnknownTopicOrPartitionException can be an
> >> intermittent error, i.e. with a generally correct configuration, there
> >> could be metadata propagation problem on the cluster and then a random
> set
> >> of records could get lost.
> >>
> >>> [AL3] Maybe I misunderstand what you are saying, but to me, checking
> the
> >> size of the record upfront is exactly what the KIP proposes? No?
> >>
> >> It achieves the same result but solves it differently, my proposal:
> >>
> >> 1. Application checks the validity of a record (maybe via a new
> >> validateRecord method) before producing it, and can just exclude it or
> >> return an error to the user.
> >> 2. Application produces the record -- at this point there are no records
> >> that could return record too large, they were either skipped at step 1
> or
> >> we didn't get here because step 1 failed.
> >>
> >> Vs. KIP's proposal
> >>
> >> 1. Application produces the record.
> >> 2. Application gets a callback.
> >> 3. Application returns the action on how to proceed.
> >>
> >> The advantage of the former is the clarity of semantics -- the record is
> >> invalid (property of the record, not a function of server state or
> server
> >> configuration) and we can clearly know that it is the record that is bad
> >> and can never succeed.
> >>
> >> The KIP-proposed way actually has a very tricky point: it actually
> handles
> >> a subset of record-too-large exceptions.  The broker can return
> >> record-too-large and reject the whole batch (but we don't want to ignore
> >> those because then we can skip random records that just happened to be
> in
> >> the same batch), in some sense we use the same error for 2 different
> >> conditions and understanding that requires pretty deep understanding of
> >> Kafka internals.
> >>
> >> -Artem
> >>
> >>
> >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan
> <jols...@confluent.io.invalid
> >>>
> >> wrote:
> >>
> >>> My concern with respect to it being fragile: the code that ensures the
> >>> error type is internal to the producer. Someone may see it and say, I
> >> want
> >>> to add such and such error. This looks like internal code, so I don't
> >> need
> >>> a KIP, and then they can change it to whatever they want thinking it is
> >>> within the typical kafka improvement protocol.
> >>>
> >>> Relying on an internal change to enforce an external API is fragile in
> my
> >>> opinion. That's why I sort of agreed with Artem with enforcing the
> error
> >> in
> >>> the method signature -- part of the public API.
> >>>
> >>> Chris's comments on requiring more information to handler again makes
> me
> >>> wonder if we are solving a problem of lack of information at the
> >>> application level with a more powerful solution than we need. (Ie, if
> we
> >>> had more information, could the application close and restart the
> >>> transaction rather than having to drop records) But I am happy to
> >>> compromise with a handler that we can agree is sufficiently controlled
> >> and
> >>> documented.
> >>>
> >>> Justine
> >>>
> >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid>
> >>> wrote:
> >>>
> >>>> Hi Alieh,
> >>>>
> >>>> Continuing prior discussions:
> >>>>
> >>>> 1) Regarding the "flexibility" discussion, my overarching point is
> >> that I
> >>>> don't see the point in allowing for this kind of pluggable logic
> >> without
> >>>> also covering more scenarios. Take example 2 in the KIP: if we're
> going
> >>> to
> >>>> implement retries only on "important" topics when a topic partition
> >> isn't
> >>>> found, why wouldn't we also want to be able to do this for other
> >> errors?
> >>>> Again, taking authorization errors as an example, why wouldn't we want
> >> to
> >>>> be able to fail when we can't write to "important" topics because the
> >>>> producer principal lacks sufficient ACLs, and drop the record if the
> >>> topic
> >>>> isn't "important"? In a security-conscious environment with
> >>>> runtime-dependent topic routing (which is a common feature of many
> >> source
> >>>> connectors, such as the Debezium connectors), this seems fairly
> likely.
> >>>>
> >>>> 2) As far as changing the shape of the API goes, I like Artem's idea
> of
> >>>> splitting out the interface based on specific exceptions. This may be
> a
> >>>> little laborious to expand in the future, but if we really want to
> >>>> limit the exceptions that we cover with the handler and move slowly
> and
> >>>> cautiously, then IMO it'd be reasonable to reflect that in the
> >>> interface. I
> >>>> also acknowledge that there's no way to completely prevent people from
> >>>> shooting themselves in the foot by implementing the API incorrectly,
> >> but
> >>> I
> >>>> think it's worth it to do what we can--including leveraging the Java
> >>>> language's type system--to help them, so IMO there's value to
> >> eliminating
> >>>> the implicit behavior of failing when a policy returns RETRY for a
> >>>> non-retriable error. This can take a variety of shapes and I'm not
> >> going
> >>> to
> >>>> insist on anything specific, but I do want to again raise my concerns
> >>> with
> >>>> the current proposal and request that we find something a little
> >> better.
> >>>>
> >>>> 3) Concerning the default implementation--actually, I meant what I
> >> wrote
> >>> :)
> >>>> I don't want a "second" default, I want an implementation of this
> >>> interface
> >>>> to be used as the default if no others are specified. The behavior of
> >>> this
> >>>> default implementation would be identical to existing behavior (so
> >> there
> >>>> would be no backwards compatibility concerns like the ones raised by
> >>>> Matthias), but it would be possible to configure this default handler
> >>> class
> >>>> to behave differently for a basic set of scenarios. This would mirror
> >>> (pun
> >>>> intended) the approach we've taken with Mirror Maker 2 and its
> >>>> ReplicationPolicy interface [1]. There is a default implementation
> >>>> available [2] that recognizes a handful of basic configuration
> >> properties
> >>>> [3] for simple tweaks, but if users want, they can also implement
> their
> >>> own
> >>>> replication policy for more fine-grained logic if those properties
> >> aren't
> >>>> flexible enough.
> >>>>
> >>>> More concretely, I'm imagining something like this for the producer
> >>>> exception handler:
> >>>>
> >>>> - Default implementation class
> >>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler
> >>>> - This class would recognize two properties:
> >>>>  - drop.invalid.large.records: Boolean property, defaults to false. If
> >>>> "false", then causes the handler to return FAIL whenever
> >>>> a RecordTooLargeException is encountered; if "true", then causes
> >>>> SWALLOW/SKIP/DROP to be returned instead
> >>>>  - unknown.topic.partition.retry.timeout.ms: Integer property,
> >> defaults
> >>>> to
> >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is encountered,
> >>>> causes the handler to return FAIL if that record has been pending for
> >>> more
> >>>> than the retry timeout; otherwise, causes RETRY to be returned
> >>>>
> >>>> I think this is worth addressing now instead of later because it
> forces
> >>> us
> >>>> to evaluate the usefulness of this interface and it addresses a
> >>>> long-standing issue not just with Kafka Connect, but with the Java
> >>> producer
> >>>> in general. For reference, here are a few tickets I collected after
> >>> briefly
> >>>> skimming our Jira showing that this is a real pain point for users:
> >>>> https://issues.apache.org/jira/browse/KAFKA-10340,
> >>>> https://issues.apache.org/jira/browse/KAFKA-12990,
> >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is
> >>>> frequently reported with Kafka Connect, it applies to anyone who
> >>> configures
> >>>> a producer to use a high retry timeout. I am aware of the
> max.block.ms
> >>>> property, but it's painful and IMO poor behavior to require users to
> >>> reduce
> >>>> the value of this property just to handle the single scenario when
> >> trying
> >>>> to write to topics that don't exist, since it would also limit the
> >> retry
> >>>> timeout for other operations that are legitimately retriable.
> >>>>
> >>>> Raising new points:
> >>>>
> >>>> 5) I don't see the interplay between this handler and existing
> >>>> retry-related properties mentioned anywhere in the KIP. I'm assuming
> >> that
> >>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms"
> >>> would
> >>>> take precedence over the handler and once they are exhausted, the
> >>>> record/batch will fail no matter what? If so, it's probably worth
> >> briefly
> >>>> mentioning this (no more than a sentence or two) in the KIP, and if
> >> not,
> >>>> I'm curious what you have in mind.
> >>>>
> >>>> 6) I also wonder if the API provides enough information in its current
> >>>> form. Would it be possible to provide handlers with some way of
> >> tracking
> >>>> how long a record has been pending for (i.e., how long it's been since
> >>> the
> >>>> record was provided as an argument to Producer::send)? One way to do
> >> this
> >>>> could be to add a method like `onNewRecord(ProducerRecord)` and
> >>>> allow/require the handler to do its own bookkeeping, probably with a
> >>>> matching `onRecordSuccess(ProducerRecord)` method so that the handler
> >>>> doesn't eat up an ever-increasing amount of memory trying to track
> >>> records.
> >>>> An alternative could be to include information about the initial time
> >> the
> >>>> record was received by the producer and the number of retries that
> have
> >>>> been performed for it as parameters in the handle method(s), but I'm
> >> not
> >>>> sure how easy this would be to implement and it might clutter things
> >> up a
> >>>> bit too much.
> >>>>
> >>>> 7) A small request--can we add Closeable (or, if you prefer,
> >>> AutoCloseable)
> >>>> as a superinterface for the handler interface?
> >>>>
> >>>> [1] -
> >>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html
> >>>> [2] -
> >>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html
> >>>> [3] -
> >>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG
> >>>> ,
> >>>>
> >>>>
> >>>
> >>
> https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG
> >>>>
> >>>> Cheers,
> >>>>
> >>>> Chris
> >>>>
> >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Very interesting discussion.
> >>>>>
> >>>>> Seems a central point is the question about "how generic" we approach
> >>>>> this, and some people think we need to be conservative and others
> >> think
> >>>>> we should try to be as generic as possible.
> >>>>>
> >>>>> Personally, I think following a limited scope for this KIP (by
> >>>>> explicitly saying we only cover RecordTooLarge and
> >>>>> UnknownTopicOrPartition) might be better. We have concrete tickets
> >> that
> >>>>> we address, while for other exception (like authorization) we don't
> >>> know
> >>>>> if people want to handle it to begin with. Boiling the ocean might
> >> not
> >>>>> get us too far, and being somewhat pragmatic might help to move this
> >>> KIP
> >>>>> forward. -- I also agree with Justin and Artem, that we want to be
> >>>>> careful anyway to not allow users to break stuff too easily.
> >>>>>
> >>>>> As the same time, I agree that we should setup this change in a
> >> forward
> >>>>> looking way, and thus having a single generic handler allows us to
> >>> later
> >>>>> extend the handler more easily. This should also simplify follow up
> >> KIP
> >>>>> that might add new error cases (I actually mentioned one more to
> >> Alieh
> >>>>> already, but we both agreed that it might be best to exclude it from
> >>> the
> >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not
> >>>>> the end of the world.)
> >>>>>
> >>>>>
> >>>>>
> >>>>> @Chris:
> >>>>>
> >>>>> (2) This sounds fair to me, but not sure how "bad" it actually would
> >>> be?
> >>>>> If the contract is clearly defined, it seems to be fine what the KIP
> >>>>> proposes, and given that such a handler is an expert API, and we can
> >>>>> provide "best practices" (cf my other comment below in [AL1]), being
> >> a
> >>>>> little bit pragmatic sound fine to me.
> >>>>>
> >>>>> Not sure if I understand Justin's argument on this question?
> >>>>>
> >>>>>
> >>>>> (3) About having a default impl or not. I am fine with adding one,
> >> even
> >>>>> if I am not sure why Connect could not just add its own one and plug
> >> it
> >>>>> in (and we would add corresponding configs for Connect, but not for
> >> the
> >>>>> producer itself)? For this case, we could also do this as a follow up
> >>>>> KIP, but happy to include it in this KIP to provide value to Connect
> >>>>> right away (even if the value might not come right away if we miss
> >> the
> >>>>> 3.8 deadline due to expanded KIP scope...) --  For KS, we would for
> >>> sure
> >>>>> plugin our own impl, and lock down the config such that users cannot
> >>> set
> >>>>> their own handler on the internal producer to begin with. Might be
> >> good
> >>>>> to elaborate why the producer should have a default? We might
> >> actually
> >>>>> want to add this to the KIP right away?
> >>>>>
> >>>>> The key for a default impl would be, to not change the current
> >>> behavior,
> >>>>> and having no default seems to achieve this. For the two cases you
> >>>>> mentioned, it's unclear to me what default value on "upper bound on
> >>>>> retires" for UnkownTopicOrPartitionException we should set? Seems it
> >>>>> would need to be the same as `delivery.timeout.ms`? However, if
> >> users
> >>>>> have `delivery.timeout.ms` actually overwritten we would need to set
> >>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2
> >>>>> minutes, it might not be backward compatible. I have the impression
> >> we
> >>>>> might introduce some undesired coupling? -- For the "record too
> >> large"
> >>>>> case, the config seems to be boolean and setting it to `false` by
> >>>>> default seems to provide backward compatibility.
> >>>>>
> >>>>>
> >>>>>
> >>>>> @Artem:
> >>>>>
> >>>>> [AL1] While I see the point, I would think having a different
> >> callback
> >>>>> for every exception might not really be elegant? In the end, the
> >>> handler
> >>>>> is an very advanced feature anyway, and if it's implemented in a bad
> >>>>> way, well, it's a user error -- we cannot protect users from
> >>> everything.
> >>>>> To me, a handler like this, is to some extend "business logic" and
> >> if a
> >>>>> user gets business logic wrong, it's hard to protect them. -- We
> >> would
> >>>>> of course provide best practice guidance in the JaveDocs, and explain
> >>>>> that a handler should have explicit `if` statements for stuff it want
> >>> to
> >>>>> handle, and only a single default which return FAIL.
> >>>>>
> >>>>>
> >>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie,
> >> the
> >>>>> TX is not completed, we clean up and setup out task from scratch, to
> >>>>> ensure the pending transaction is completed before we resume. If the
> >> TX
> >>>>> was indeed aborted, we would retry from older offset and thus just
> >> hit
> >>>>> the same error again and the loop begins again.
> >>>>>
> >>>>>
> >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as
> >>>>> business logic. If a user puts a bad filter condition in their KS
> >> app,
> >>>>> and drops messages, it nothing we can do about it, and this handler
> >>>>> IMHO, has a similar purpose. This is also the line of thinking I
> >> apply
> >>>>> to EOS, to address Justin's concern about "should we allow to drop
> >> for
> >>>>> EOS", and my answer is "yes", because it's more business logic than
> >>>>> actual error handling IMHO. And by default, we fail... So users
> >> opt-in
> >>>>> to add business logic to drop records. It's an application level
> >>>>> decision how to write the code.
> >>>>>
> >>>>>
> >>>>> [AL3] Maybe I misunderstand what you are saying, but to me, checking
> >>> the
> >>>>> size of the record upfront is exactly what the KIP proposes? No?
> >>>>>
> >>>>>
> >>>>>
> >>>>> @Justin:
> >>>>>
> >>>>>> I saw the sample
> >>>>>> code -- is it just an if statement checking for the error before
> >> the
> >>>>>> handler is invoked? That seems a bit fragile.
> >>>>>
> >>>>> What do you mean by fragile? Not sure if I see your point.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote:
> >>>>>> Hi Alieh,
> >>>>>>
> >>>>>> Thanks for the KIP.  The motivation talks about very specific
> >> cases,
> >>>> but
> >>>>>> the interface is generic.
> >>>>>>
> >>>>>> [AL1]
> >>>>>> If the interface evolves in the future I think we could have the
> >>>>> following
> >>>>>> confusion:
> >>>>>>
> >>>>>> 1. A user implemented SWALLOW action for both
> >> RecordTooLargeException
> >>>> and
> >>>>>> UnknownTopicOrPartitionException.  For simpicity they just return
> >>>> SWALLOW
> >>>>>> from the function, because it elegantly handles all known cases.
> >>>>>> 2. The interface has evolved to support a new exception.
> >>>>>> 3. The user has upgraded their Kafka client.
> >>>>>>
> >>>>>> Now a new kind of error gets dropped on the floor without user's
> >>>>> intention
> >>>>>> and it would be super hard to detect and debug.
> >>>>>>
> >>>>>> To avoid the confusion, I think we should use handlers for specific
> >>>>>> exceptions.  Then if a new exception is added it won't get silently
> >>>>> swalled
> >>>>>> because the user would need to add new functionality to handle it.
> >>>>>>
> >>>>>> I also have some higher level comments:
> >>>>>>
> >>>>>> [AL2]
> >>>>>>> it throws a TimeoutException, and the user can only blindly retry,
> >>>> which
> >>>>>> may result in an infinite retry loop
> >>>>>>
> >>>>>> If the TimeoutException happens during transactional processing
> >>>> (exactly
> >>>>>> once is the desired sematnics), then the client should not retry
> >> when
> >>>> it
> >>>>>> gets TimeoutException because without knowing the reason for
> >>>>>> TimeoutExceptions, the client cannot know whether the message got
> >>>>> actually
> >>>>>> produced or not and retrying the message may result in duplicatees.
> >>>>>>
> >>>>>>> The thrown TimeoutException "cuts" the connection to the
> >> underlying
> >>>> root
> >>>>>> cause of missing metadata
> >>>>>>
> >>>>>> Maybe we should fix the error handling and return the proper
> >>> underlying
> >>>>>> message?  Then the application can properly handle the message
> >> based
> >>> on
> >>>>>> preferences.
> >>>>>>
> >>>>>> From the product perspective, it's not clear how safe it is to
> >>> blindly
> >>>>>> ignore UnknownTopicOrPartitionException.  This could lead to
> >>> situations
> >>>>>> when a simple typo could lead to massive data loss (part of the
> >> data
> >>>>> would
> >>>>>> effectively be produced to a "black hole" and the user may not
> >> notice
> >>>> it
> >>>>>> for a while).
> >>>>>>
> >>>>>> In which situations would you recommend the user to "black hole"
> >>>> messages
> >>>>>> in case of misconfiguration?
> >>>>>>
> >>>>>> [AL3]
> >>>>>>
> >>>>>>> If the custom handler decides on SWALLOW for
> >>> RecordTooLargeException,
> >>>>>>
> >>>>>> Is it my understanding that this KIP proposes that functionality
> >> that
> >>>>> would
> >>>>>> only be able to SWALLOW RecordTooLargeException that happen because
> >>> the
> >>>>>> producer cannot produce the record (if the broker rejects the
> >> batch,
> >>>> the
> >>>>>> error won't get to the handler, because we cannot know which other
> >>>>> records
> >>>>>> get ignored).  In this case, why not just check the locally
> >>> configured
> >>>>> max
> >>>>>> record size upfront and not produce the recrord in the first place?
> >>>>> Maybe
> >>>>>> we can expose a validation function from the producer that could
> >>>> validate
> >>>>>> the records locally, so we don't need to produce the record in
> >> order
> >>> to
> >>>>>> know that it's invalid.
> >>>>>>
> >>>>>> -Artem
> >>>>>>
> >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan
> >>>>> <jols...@confluent.io.invalid>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Alieh and Chris,
> >>>>>>>
> >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just
> >>>> didn't
> >>>>>>> understand how that would be ensured on the producer side. I saw
> >> the
> >>>>> sample
> >>>>>>> code -- is it just an if statement checking for the error before
> >> the
> >>>>>>> handler is invoked? That seems a bit fragile.
> >>>>>>>
> >>>>>>> Can you clarify what you mean by `since the code does not reach
> >> the
> >>> KS
> >>>>>>> interface and breaks somewhere in producer.` If we surfaced this
> >>> error
> >>>>> to
> >>>>>>> the application in a better way would that also be a solution to
> >> the
> >>>>> issue?
> >>>>>>>
> >>>>>>> Justine
> >>>>>>>
> >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi
> >>>>> <asae...@confluent.io.invalid>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thank you, Chris and Justine, for the feedback.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> @Chris
> >>>>>>>>
> >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the one
> >>> you
> >>>>>>>> mentioned. We are going to cover more exceptions in the future,
> >> but
> >>>> as
> >>>>>>>> Justine mentioned, we must be very conservative about adding more
> >>>>>>>> exceptions. Additionally, flexibility mainly means that the user
> >> is
> >>>>> able
> >>>>>>> to
> >>>>>>>> develop their own code. As mentioned in the motivation section
> >> and
> >>>> the
> >>>>>>>> examples, sometimes the user decides on dropping a record based
> >> on
> >>>> the
> >>>>>>>> topic, for example.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 2) Defining two separate methods for retriable and non-retriable
> >>>>>>>> exceptions: although the idea is brilliant, the user may still
> >>> make a
> >>>>>>>> mistake by implementing the wrong method and see a non-expecting
> >>>>>>> behaviour.
> >>>>>>>> For example, he may implement handleRetriable() for
> >>>>>>> RecordTooLargeException
> >>>>>>>> and define SWALLOW for the exception, but in practice, he sees no
> >>>>> change
> >>>>>>> in
> >>>>>>>> default behaviour since he implemented the wrong method. I think
> >> we
> >>>> can
> >>>>>>>> never reduce the user’s mistakes to 0.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 3) Default implementation for Handler: the default behaviour is
> >>>> already
> >>>>>>>> preserved with NO need of implementing any handler or setting the
> >>>>>>>> corresponding config parameter `custom.exception.handler`. What
> >> you
> >>>>> mean
> >>>>>>> is
> >>>>>>>> actually having a second default, which requires having both
> >>>> interface
> >>>>>>> and
> >>>>>>>> config parameters. About UnknownTopicOrPartitionException: the
> >>>> producer
> >>>>>>>> already offers the config parameter `max.block.ms` which
> >>> determines
> >>>>> the
> >>>>>>>> duration of retrying. The main purpose of the user who needs the
> >>>>> handler
> >>>>>>> is
> >>>>>>>> to get the root cause of TimeoutException and handle it in the
> >> way
> >>> he
> >>>>>>>> intends. The KIP explains the necessity of it for KS users.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the error,
> >>>> while
> >>>>>>>> SKIP means skip the record, I think. If it makes sense for more
> >>> ppl,
> >>>> I
> >>>>>>> can
> >>>>>>>> change it to SKIP
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> @Justine
> >>>>>>>>
> >>>>>>>> 1) was addressed by Chris.
> >>>>>>>>
> >>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently,
> >>> there
> >>>> is
> >>>>>>> no
> >>>>>>>> way to handle these issues application-side. Even KS users who
> >>>>> implement
> >>>>>>> KS
> >>>>>>>> ProductionExceptionHandler are not able to handle the exceptions
> >> as
> >>>>> they
> >>>>>>>> intend since the code does not reach the KS interface and breaks
> >>>>>>> somewhere
> >>>>>>>> in producer.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Alieh
> >>>>>>>>
> >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton <
> >>>> fearthecel...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Justine,
> >>>>>>>>>
> >>>>>>>>> The method signatures for the interface are indeed open-ended,
> >> but
> >>>> the
> >>>>>>>> KIP
> >>>>>>>>> states that its uses will be limited. See the motivation
> >> section:
> >>>>>>>>>
> >>>>>>>>>> We believe that the user should be able to develop custom
> >>> exception
> >>>>>>>>> handlers for managing producer exceptions. On the other hand,
> >> this
> >>>>> will
> >>>>>>>> be
> >>>>>>>>> an expert-level API, and using that may result in strange
> >>> behaviour
> >>>> in
> >>>>>>>> the
> >>>>>>>>> system, making it hard to find the root cause. Therefore, the
> >>> custom
> >>>>>>>>> handler is currently limited to handling RecordTooLargeException
> >>> and
> >>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Chris
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan
> >>>>> <jols...@confluent.io.invalid
> >>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Alieh,
> >>>>>>>>>>
> >>>>>>>>>> I was out for KSB and then was also sick. :(
> >>>>>>>>>>
> >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two
> >>> specific
> >>>>>>>>>> scenarios, since the interface accepts a generic Exception e
> >> and
> >>>> can
> >>>>>>> be
> >>>>>>>>>> implemented to check if that e is an instanceof any exception.
> >> I
> >>>>>>> didn't
> >>>>>>>>> see
> >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit concerned
> >>>> about
> >>>>>>>>> this
> >>>>>>>>>> actually. I'm concerned about the opened-endedness and the
> >>> contract
> >>>>>>> we
> >>>>>>>>> have
> >>>>>>>>>> with transactions. We are allowing the client to make decisions
> >>>> that
> >>>>>>>> are
> >>>>>>>>>> somewhat invisible to the server. As an aside, can we build in
> >>> log
> >>>>>>>>> messages
> >>>>>>>>>> when the handler decides to skip etc a message. I'm really
> >>>> concerned
> >>>>>>>>> about
> >>>>>>>>>> messages being silently dropped.
> >>>>>>>>>>
> >>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable
> >>> errors
> >>>>>>> is
> >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or
> >>>> partition
> >>>>>>>>>> exception too early, as there are cases where it can be
> >>> transient.
> >>>>>>>>>>
> >>>>>>>>>> I'm still a little bit wary of allowing dropping records as
> >> part
> >>> of
> >>>>>>> EOS
> >>>>>>>>>> generally as in many cases, these errors signify an issue with
> >>> the
> >>>>>>>>> original
> >>>>>>>>>> data. I understand that streams and connect/mirror maker may
> >> have
> >>>>>>>> reasons
> >>>>>>>>>> they want to progress past these messages, but wondering if
> >> there
> >>>> is
> >>>>>>> a
> >>>>>>>>> way
> >>>>>>>>>> that can be done application-side. I'm willing to accept this
> >>> sort
> >>>> of
> >>>>>>>>>> proposal if we can make it clear that this sort of thing is
> >>>> happening
> >>>>>>>> and
> >>>>>>>>>> we limit the blast radius for what we can do.
> >>>>>>>>>>
> >>>>>>>>>> Justine
> >>>>>>>>>>
> >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton
> >>>> <chr...@aiven.io.invalid
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>
> >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some
> >>>> thoughts
> >>>>>>>>> that
> >>>>>>>>>>> I'd like to see addressed before voting.
> >>>>>>>>>>>
> >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable interface,
> >>> why
> >>>>>>>> are
> >>>>>>>>> we
> >>>>>>>>>>> only limiting the uses for this interface to two very specific
> >>>>>>>>> scenarios?
> >>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled
> >> as
> >>>>>>> well
> >>>>>>>>>>> (allowing users to drop records destined for some off-limits
> >>>>>>> topics,
> >>>>>>>> or
> >>>>>>>>>>> retry for a limited duration in case there's a delay in the
> >>>>>>>> propagation
> >>>>>>>>>> of
> >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other
> >> errors
> >>>>>>> that
> >>>>>>>>>> could
> >>>>>>>>>>> be handled with this new API, both to avoid the follow-up work
> >>> of
> >>>>>>>>> another
> >>>>>>>>>>> KIP to address them in the future, and to make sure that we're
> >>> not
> >>>>>>>>>> painting
> >>>>>>>>>>> ourselves into a corner with the current API in a way that
> >> would
> >>>>>>> make
> >>>>>>>>>>> future modifications difficult.
> >>>>>>>>>>>
> >>>>>>>>>>> 2) Something feels a bit off with how retriable vs.
> >>> non-retriable
> >>>>>>>>> errors
> >>>>>>>>>>> are handled with the interface. Why not introduce two separate
> >>>>>>>> methods
> >>>>>>>>> to
> >>>>>>>>>>> handle each case separately? That way there's no ambiguity or
> >>>>>>>> implicit
> >>>>>>>>>>> behavior when, e.g., attempting to retry on a
> >>>>>>>> RecordTooLargeException.
> >>>>>>>>>> This
> >>>>>>>>>>> could be something like `NonRetriableResponse
> >>>>>>> handle(ProducerRecord,
> >>>>>>>>>>> Exception)` and `RetriableResponse
> >>> handleRetriable(ProducerRecord,
> >>>>>>>>>>> Exception)`, though the exact names and shape can obviously be
> >>>>>>> toyed
> >>>>>>>>>> with a
> >>>>>>>>>>> bit.
> >>>>>>>>>>>
> >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may
> >> benefit
> >>>>>>> some
> >>>>>>>>>>> users' custom producer applications and Kafka Streams
> >>>> applications,
> >>>>>>>> it
> >>>>>>>>>>> comes at significant deployment cost for other low-/no-code
> >>>>>>>>> environments,
> >>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2.
> >>> Can
> >>>>>>> we
> >>>>>>>>> add
> >>>>>>>>>> a
> >>>>>>>>>>> default implementation of the exception handler that allows
> >> for
> >>>>>>> some
> >>>>>>>>>> simple
> >>>>>>>>>>> behavior to be tweaked via configuration property? Two things
> >>> that
> >>>>>>>>> would
> >>>>>>>>>> be
> >>>>>>>>>>> nice to have would be A) an upper bound on the retry time for
> >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop
> >>>> records
> >>>>>>>>> that
> >>>>>>>>>>> are large enough to trigger a record-too-large exception.
> >>>>>>>>>>>
> >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the
> >>>> proposed
> >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious,
> >>> especially
> >>>>>>>> when
> >>>>>>>>>>> trying to guess the behavior for retriable errors.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>
> >>>>>>>>>>> Chris
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi
> >>>>>>>>>> <asae...@confluent.io.invalid
> >>>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> A summary of the KIP and the discussions:
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> The KIP introduces a handler interface for Producer in order
> >> to
> >>>>>>>>> handle
> >>>>>>>>>>> two
> >>>>>>>>>>>> exceptions: RecordTooLargeException and
> >>>>>>>>>> UnknownTopicOrPartitionException.
> >>>>>>>>>>>> The handler handles the exceptions per-record.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Do we need this handler?  [Motivation and Examples
> >> sections]
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer
> >>>>>>> collects
> >>>>>>>>>>> multiple
> >>>>>>>>>>>> records in batches. Then a RecordTooLargeException related
> >> to a
> >>>>>>>>> single
> >>>>>>>>>>>> record leads to failing the entire batch. A custom exception
> >>>>>>>> handler
> >>>>>>>>> in
> >>>>>>>>>>>> this case may decide on dropping the record and continuing
> >> the
> >>>>>>>>>>> processing.
> >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a
> >> record
> >>>>>>>> that
> >>>>>>>>> is
> >>>>>>>>>>> too
> >>>>>>>>>>>> large is a poison pill record, and there is no way to skip
> >> over
> >>>>>>>> it. A
> >>>>>>>>>>>> handler would allow us to react to this error inside the
> >>>>>>> producer,
> >>>>>>>>>> i.e.,
> >>>>>>>>>>>> local to where the error happens, and thus simplify the
> >> overall
> >>>>>>>> code
> >>>>>>>>>>>> significantly. Please read the Motivation section for more
> >>>>>>>>> explanation.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the producer
> >>>>>>>> handles
> >>>>>>>>>>> this
> >>>>>>>>>>>> exception internally and only issues a WARN log about missing
> >>>>>>>>> metadata
> >>>>>>>>>>> and
> >>>>>>>>>>>> retries internally. Later, when the producer hits "
> >>>>>>>>> deliver.timeout.ms"
> >>>>>>>>>>> it
> >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly
> >> retry,
> >>>>>>>> which
> >>>>>>>>>> may
> >>>>>>>>>>>> result in an infinite retry loop. The thrown TimeoutException
> >>>>>>>> "cuts"
> >>>>>>>>>> the
> >>>>>>>>>>>> connection to the underlying root cause of missing metadata
> >>>>>>> (which
> >>>>>>>>>> could
> >>>>>>>>>>>> indeed be a transient error but is persistent for a
> >>> non-existing
> >>>>>>>>>> topic).
> >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite
> >> retry
> >>>>>>>> loop.
> >>>>>>>>>>> Kafka
> >>>>>>>>>>>> Streams also blindly retries for this case, and the
> >> application
> >>>>>>>> gets
> >>>>>>>>>>> stuck.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Having interface vs configuration option: [Motivation,
> >>>>>>> Examples,
> >>>>>>>>> and
> >>>>>>>>>>>> Rejected Alternatives sections]
> >>>>>>>>>>>>
> >>>>>>>>>>>> Our solution is introducing an interface due to the full
> >>>>>>>> flexibility
> >>>>>>>>>> that
> >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones,
> >>>>>>>> determine
> >>>>>>>>>> the
> >>>>>>>>>>>> handler's behaviour based on the situation. For example, f
> >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want
> >> to
> >>>>>>>> raise
> >>>>>>>>> an
> >>>>>>>>>>>> error for some topics but retry it for other topics. Having a
> >>>>>>>>>>> configuration
> >>>>>>>>>>>> option with a fixed set of possibilities does not serve the
> >>>>>>> user's
> >>>>>>>>>>>> needs. See Example 2, please.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces
> >> section]
> >>>>>>>>>>>>
> >>>>>>>>>>>> If the custom handler decides on SWALLOW for
> >>>>>>>> RecordTooLargeException,
> >>>>>>>>>>> then
> >>>>>>>>>>>> this record will not be a part of the batch of transactions
> >> and
> >>>>>>>> will
> >>>>>>>>>> also
> >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no
> >>>>>>> worries
> >>>>>>>>>> about
> >>>>>>>>>>>> getting a RecordTooLargeException from the broker in this
> >> case,
> >>>>>>> as
> >>>>>>>>> the
> >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means
> >>> drop
> >>>>>>>> the
> >>>>>>>>>>> record
> >>>>>>>>>>>> and continue/swallow the error.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> - What if the handle() method implements RETRY for
> >>>>>>>>>>> RecordTooLargeException?
> >>>>>>>>>>>> [Proposed Changes section]
> >>>>>>>>>>>>
> >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for
> >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to
> >> FAIL.
> >>>>>>>> This
> >>>>>>>>> is
> >>>>>>>>>>>> well documented/informed in javadoc.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> - What if the handle() method of the handler throws an
> >>> exception?
> >>>>>>>>>>>>
> >>>>>>>>>>>> The handler is expected to have correct code. If it throws an
> >>>>>>>>>> exception,
> >>>>>>>>>>>> everything fails.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> This is a PoC PR <https://github.com/apache/kafka/pull/15846
> >>>
> >>>>>>> ONLY
> >>>>>>>>> for
> >>>>>>>>>>>> RecordTooLargeException. The code changes related to
> >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR
> >>> LATER.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking forward to your feedback again.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Alieh
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True <
> >> k...@kirktrue.pro>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Comments inline...
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi
> >>>>>>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Addressing some of the main concerns:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker,
> >>>>>>>> producer
> >>>>>>>>>> and
> >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler`
> >> interface
> >>>>>>>> is
> >>>>>>>>>>>>> introduced
> >>>>>>>>>>>>>> to affect only the exceptions thrown from the producer.
> >> This
> >>>>>>>> KIP
> >>>>>>>>>> very
> >>>>>>>>>>>>>> specifically means to provide a possibility to manage the
> >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send()
> >>>>>>>> method.
> >>>>>>>>>>>> Please
> >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I
> >>>>>>> investigated
> >>>>>>>>> the
> >>>>>>>>>>>> issue
> >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about
> >> how
> >>>>>>>> we
> >>>>>>>>>>> handle
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> errors as well.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are called
> >>>>>>>> when
> >>>>>>>>>> the
> >>>>>>>>>>>>> record
> >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the
> >>>>>>>> desired
> >>>>>>>>>>> time
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when
> >>>>>>> Callback
> >>>>>>>> is
> >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> - What if the custom handler returns RETRY for
> >>>>>>>>>>>>> `RecordTooLargeException`? I
> >>>>>>>>>>>>>> assume changing the producer configuration at runtime is
> >>>>>>>>> possible.
> >>>>>>>>>> If
> >>>>>>>>>>>> so,
> >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the
> >>>>>>> next
> >>>>>>>>>> try,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100%
> >>>>>>> sure
> >>>>>>>>>> about
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the
> >>>>>>> RETRY
> >>>>>>>>> as
> >>>>>>>>>>> FAIL
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>> this exception. Another solution would be to consider a
> >>>>>>>> constant
> >>>>>>>>>>> number
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions as
> >>>>>>>> well.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It’s not presently possible to change the configuration of
> >> an
> >>>>>>>>>> existing
> >>>>>>>>>>>>> Producer at runtime. So if a record hits a
> >>>>>>>> RecordTooLargeException
> >>>>>>>>>>> once,
> >>>>>>>>>>>> no
> >>>>>>>>>>>>> amount of retrying (with the current Producer) will change
> >>> that
> >>>>>>>>> fact.
> >>>>>>>>>>> So
> >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of
> >> RETRY
> >>>>>>> for
> >>>>>>>>> an
> >>>>>>>>>>>>> “oversized” record.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> - What if the handle() method itself throws an exception? I
> >>>>>>>> think
> >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be exactly
> >>>>>>>> like
> >>>>>>>>>> when
> >>>>>>>>>>>> no
> >>>>>>>>>>>>>> custom handler is defined since the user actually did not
> >>>>>>> have
> >>>>>>>> a
> >>>>>>>>>>>> working
> >>>>>>>>>>>>>> handler.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the
> >> right
> >>>>>>>>>> choice.
> >>>>>>>>>>> It
> >>>>>>>>>>>>> then becomes a silent failure that might have repercussions,
> >>>>>>>>>> depending
> >>>>>>>>>>> on
> >>>>>>>>>>>>> the business logic. A user would have to proactively trawls
> >>>>>>>> through
> >>>>>>>>>> the
> >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though…
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Why not use config parameters instead of an interface? As
> >>>>>>>>>> explained
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the
> >>>>>>> handler
> >>>>>>>>>> will
> >>>>>>>>>>> be
> >>>>>>>>>>>>>> used for a greater number of exceptions in the future.
> >>>>>>>> Defining a
> >>>>>>>>>>>>>> configuration parameter for each exception may make the
> >>>>>>>>>>> configuration a
> >>>>>>>>>>>>> bit
> >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird
> >> and
> >>>>>>>>>> limiting.
> >>>>>>>>>>>>> Forget I ever suggested it ;)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I’d think additional background in the Motivation section
> >>> would
> >>>>>>>>> help
> >>>>>>>>>> me
> >>>>>>>>>>>>> understand how users might use this feature beyond a)
> >> skipping
> >>>>>>>>>>>> “oversized”
> >>>>>>>>>>>>> records, and b) not retrying missing topics.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Small change:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for brevity
> >>>>>>> and
> >>>>>>>>>>>>> simplicity.
> >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The name change sounds good to me.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks Alieh!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I thank you all again for your useful
> >> questions/suggestions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated
> >> in
> >>>>>>>> some
> >>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan
> >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks Alieh for the updates.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It
> >>>>>>> seems
> >>>>>>>>>> like
> >>>>>>>>>>> we
> >>>>>>>>>>>>> want
> >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic
> >>>>>>> handler.
> >>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we
> >>>>>>> want
> >>>>>>>>> to
> >>>>>>>>>>>>> handle,
> >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing
> >> for
> >>>>>>>> two
> >>>>>>>>>>>> specific
> >>>>>>>>>>>>>>> errors.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve
> >>>>>>>> these
> >>>>>>>>>>>>> problems. I
> >>>>>>>>>>>>>>> agree though that we will need something more than the
> >> error
> >>>>>>>>>> classes
> >>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>> proposing if we want to have different handling be
> >>>>>>>> configurable.
> >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means
> >>>>>>> that
> >>>>>>>> we
> >>>>>>>>>> are
> >>>>>>>>>>>>>>> creating more problems than we are solving. It is still
> >>>>>>>> unclear
> >>>>>>>>> to
> >>>>>>>>>>> me
> >>>>>>>>>>>>> how
> >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include
> >> an
> >>>>>>>>>> example?
> >>>>>>>>>>>> It
> >>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe
> >> we
> >>>>>>>> can
> >>>>>>>>>>> make
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>> design that is tighter and supports that case.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <
> >>>>>>> k...@kirktrue.pro>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for the KIP!
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> A few questions:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it
> >>>>>>>>>> generates
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY?
> >>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for
> >>>>>>> the
> >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that
> >> response
> >>>>>>>>> when
> >>>>>>>>>>>>>>> sending  a
> >>>>>>>>>>>>>>>> batch of records?
> >>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method
> >>>>>>>> itself
> >>>>>>>>>>>> throws
> >>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>> error?
> >>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method to
> >>>>>>> the
> >>>>>>>>>>>>> Producer’s
> >>>>>>>>>>>>>>>> Callback interface vs. a new mechanism?
> >>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to
> >>>>>>> just
> >>>>>>>>>>>> “Response”
> >>>>>>>>>>>>>>>> given that it’s an inner enum?
> >>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle
> >>>>>>>> different
> >>>>>>>>>>>>> behavior
> >>>>>>>>>>>>>>>> for different topics?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me
> >>>>>>> understand
> >>>>>>>>> why
> >>>>>>>>>> we
> >>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>> another handler class if there were more examples in the
> >>>>>>>>>> Motivation
> >>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the
> >>>>>>> stated
> >>>>>>>>>>> issues
> >>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>> be solved by adding two new configuration options:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>    oversized.record.behavior=fail
> >>>>>>>>>>>>>>>>    retry.on.unknown.topic.or.partition=true
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what
> >>>>>>> exactly
> >>>>>>>>>> would
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so I’m
> >>>>>>>>>> assuming
> >>>>>>>>>>>>> they’d
> >>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more
> >>>>>>>>>> complicated,
> >>>>>>>>>>>> I’d
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> other concerns.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>> Kirk
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi
> >>>>>>>>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thank you all for the feedback!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving the
> >>>>>>>> user
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> ability
> >>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more
> >> conservative
> >>>>>>>> and
> >>>>>>>>>>> avoid
> >>>>>>>>>>>>>>>> future
> >>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of
> >>>>>>>>> exceptions.
> >>>>>>>>>> I
> >>>>>>>>>>>>>>>> included
> >>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and
> >>>>>>>>> *UnknownTopicOrPartitionException.
> >>>>>>>>>>>> *Open
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> suggestion for adding some more ;-)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> KIP Updates:
> >>>>>>>>>>>>>>>>> - clarified the way that the user should configure the
> >>>>>>>>> Producer
> >>>>>>>>>> to
> >>>>>>>>>>>> use
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> custom handler. I think adding a producer config
> >> property
> >>>>>>> is
> >>>>>>>>> the
> >>>>>>>>>>>>>>> cleanest
> >>>>>>>>>>>>>>>>> one.
> >>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to
> >>>>>>>>>>> *ProducerExceptionHandler*
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> closer to what we are changing.
> >>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of the
> >>>>>>>>>> handle()
> >>>>>>>>>>>>>>> method
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> well.
> >>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and two
> >>>>>>>> types
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>> continue.
> >>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler,
> >>>>>>> having
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore,
> >> the
> >>>>>>>> KIP
> >>>>>>>>>>>>> provides
> >>>>>>>>>>>>>>>> no
> >>>>>>>>>>>>>>>>> default implementation of the interface.
> >>>>>>>>>>>>>>>>> - We follow the interface solution as described in the
> >>>>>>>>>>>>>>>>> Rejected Alternetives section.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <
> >>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important
> >> case
> >>>>>>>> for
> >>>>>>>>>>> error
> >>>>>>>>>>>>>>>>>> handling.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert API,
> >>>>>>> as
> >>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason to
> >>>>>>> not
> >>>>>>>>> add
> >>>>>>>>>>> it.
> >>>>>>>>>>>>>>> It's
> >>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and to
> >>>>>>>> avoid
> >>>>>>>>>>> foot
> >>>>>>>>>>>>>>> guns,
> >>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and
> >> have
> >>>>>>>> good
> >>>>>>>>>>>>>>> experience
> >>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the
> >> concern
> >>>>>>>>>> raised.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the
> >>>>>>> user
> >>>>>>>> to
> >>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop
> >> data
> >>>>>>>> by
> >>>>>>>>>>>>> accident.
> >>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg
> >> via
> >>>>>>>>>>> configs)
> >>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs
> >>>>>>> have
> >>>>>>>>> the
> >>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading
> >> to
> >>>>>>>>>>>> incorrectly
> >>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible
> >> (and
> >>>>>>>>> thus
> >>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my
> >> experience,
> >>>>>>>>> there
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to also
> >>>>>>>> drop
> >>>>>>>>>>>> records
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage
> >> of
> >>>>>>>>> full
> >>>>>>>>>>>>>>>>>> flexibility and "absolute power!".
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the
> >>>>>>>> producer
> >>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> details, so please keep me honest. But my understanding
> >>>>>>> is,
> >>>>>>>>>> that
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception, and
> >>>>>>>>> decide
> >>>>>>>>>> to
> >>>>>>>>>>>>> keep
> >>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the
> >>>>>>> record,
> >>>>>>>>> or
> >>>>>>>>>>>> raise
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> error?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more
> >> precises
> >>>>>>>>> what
> >>>>>>>>>>>> error
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be
> >>>>>>>> exhaustive,
> >>>>>>>>> as
> >>>>>>>>>>> we
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to
> >>>>>>> other
> >>>>>>>>>> errors
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention
> >>>>>>>>> examples,
> >>>>>>>>>>> but
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the
> >>>>>>>> handler
> >>>>>>>>>> gets
> >>>>>>>>>>>>>>>> applied?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough
> >>>>>>> options?
> >>>>>>>>>> Don't
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different
> >>>>>>>>> meaning
> >>>>>>>>>>>>>>> depending
> >>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error
> >>>>>>> `CONTINUE`
> >>>>>>>>>> would
> >>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable error
> >>>>>>>>>>> `CONTINUE`
> >>>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic
> >>>>>>>> overload
> >>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to
> >>>>>>>> split
> >>>>>>>>>>>>>>> `CONTINUE`
> >>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better
> >>>>>>>>> names).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Additionally, should we just ship a
> >>>>>>>>>>> `DefaultClientExceptionHandler`
> >>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility.
> >> Or
> >>>>>>>>> don't
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be
> >> `null`?
> >>>>>>> I
> >>>>>>>>>> don't
> >>>>>>>>>>>> see
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To
> >> me,
> >>>>>>>> the
> >>>>>>>>>>> goal
> >>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all,
> >> but
> >>>>>>> to
> >>>>>>>>>> just
> >>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>> users to change the default behavior if there is a
> >> need.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler
> >> is
> >>>>>>>>> passed
> >>>>>>>>>>>> into
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which
> >> allows
> >>>>>>>> to
> >>>>>>>>>> set
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an
> >>>>>>>> instance
> >>>>>>>>>> via
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> >>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer
> >> is
> >>>>>>>>>> really
> >>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>> ideal.
> >>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the
> >>>>>>>>>> consumer.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with
> >> Chris
> >>>>>>>> that
> >>>>>>>>>> it
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>> with additional
> >>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface
> >>>>>>> given
> >>>>>>>>> how
> >>>>>>>>>>>>>>>> complicated
> >>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the batching
> >>>>>>>> being
> >>>>>>>>>>> done
> >>>>>>>>>>>> on
> >>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>> behalf.
> >>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of
> >>>>>>>> records
> >>>>>>>>>>> which
> >>>>>>>>>>>>>>>> failed
> >>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and
> >> want
> >>>>>>> to
> >>>>>>>>>>> retry,
> >>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> probably
> >>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch
> >> and
> >>>>>>>>> retry
> >>>>>>>>>>> it.
> >>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> is getting fiddly.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. I
> >>>>>>>>> wonder
> >>>>>>>>>>>>> whether
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing
> >>>>>>>>> Callback
> >>>>>>>>>>>>>>>> interface,
> >>>>>>>>>>>>>>>>>> such as:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>  ClientExceptionResponse onException(Exception
> >>>>>>> exception)
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an
> >>>>>>>> error,
> >>>>>>>>>> but
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer
> >>>>>>> whether
> >>>>>>>> to
> >>>>>>>>>> go
> >>>>>>>>>>>>> ahead
> >>>>>>>>>>>>>>>>>> with the retry
> >>>>>>>>>>>>>>>>>>> or not. The default implementation would be to
> >> CONTINUE,
> >>>>>>>>>> because
> >>>>>>>>>>>>>>> that’s
> >>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this
> >> is a
> >>>>>>>>>>>> per-record
> >>>>>>>>>>>>>>>>>> callback, so
> >>>>>>>>>>>>>>>>>>> the application would be able to understand which
> >>>>>>> records
> >>>>>>>>>>> failed.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how to
> >>>>>>>>>> configure
> >>>>>>>>>>>> it
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> we know
> >>>>>>>>>>>>>>>>>>> about the threading model for calling it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton
> >>>>>>>>>>> <chr...@aiven.io.INVALID
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to
> >>>>>>>> non-existent
> >>>>>>>>>>> topics
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect
> >> and
> >>>>>>>> has
> >>>>>>>>>>> been
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My
> >>>>>>> thoughts:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation
> >>>>>>> (or
> >>>>>>>>>>> possibly
> >>>>>>>>>>>>>>>>>> rejected
> >>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom
> >> retry
> >>>>>>>>> logic
> >>>>>>>>>>>> can't
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 in
> >>>>>>> the
> >>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or
> >>>>>>>>> rather,
> >>>>>>>>>> it
> >>>>>>>>>>>>> can,
> >>>>>>>>>>>>>>>>>> but 1)
> >>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every
> >>>>>>>>> call-site
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the returned
> >>>>>>>>> Future)
> >>>>>>>>>>> and
> >>>>>>>>>>>>> 2)
> >>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on
> >>>>>>>>> retries.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is
> >>>>>>> really
> >>>>>>>>> the
> >>>>>>>>>>>> right
> >>>>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer with
> >>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, and
> >>>>>>>>>>>> implementing
> >>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that
> >>>>>>> there
> >>>>>>>>>> needs
> >>>>>>>>>>> to
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent
> >> topics
> >>>>>>>> (or
> >>>>>>>>>>>> really,
> >>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from
> >> the
> >>>>>>>>>> broker)
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> handled,
> >>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe one
> >>>>>>> or
> >>>>>>>>> two
> >>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable
> >> interface.
> >>>>>>>> If
> >>>>>>>>>>> there
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface,
> >> it
> >>>>>>>>> would
> >>>>>>>>>>> be
> >>>>>>>>>>>>>>> nice
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen
> >>>>>>> the
> >>>>>>>>>>>>> motivation.
> >>>>>>>>>>>>>>>>>> Right
> >>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the
> >>>>>>>>>> motivation
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> sufficient.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this
> >> KIP
> >>>>>>>> to
> >>>>>>>>>>>>> introduce
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>> properties that dictate how to handle
> >>>>>>>>> unknown-topic-partition
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if we
> >>>>>>>>>>> introduce a
> >>>>>>>>>>>>>>>>>> pluggable
> >>>>>>>>>>>>>>>>>>>> interface later on, these properties will be
> >> recognized
> >>>>>>>> by
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>> implementation of that interface but could be
> >>>>>>> completely
> >>>>>>>>>>> ignored
> >>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> replaced by alternative implementations.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a
> >>>>>>>>> template
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> writing a
> >>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template
> >> anymore
> >>>>>>> :)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route,
> >> wouldn't
> >>>>>>> we
> >>>>>>>>>> want
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of
> >>>>>>> this
> >>>>>>>>>> could
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse
> >> enum.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead
> >> of
> >>>>>>>>>>>> "CONTINUE"
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they
> >>>>>>> cause
> >>>>>>>>>>> records
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> dropped.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
> >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hey Alieh,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand
> >> the
> >>>>>>>>>>>> implications
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit:
> >>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the
> >>>>>>>>>>> transaction
> >>>>>>>>>>>>>>> even
> >>>>>>>>>>>>>>>>>> if a
> >>>>>>>>>>>>>>>>>>>>> record is too large.
> >>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a
> >> KIP
> >>>>>>>> for
> >>>>>>>>>>>>>>>> transactions
> >>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for
> >> the
> >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), we
> >>>>>>>> want
> >>>>>>>>> to
> >>>>>>>>>>>> give
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>> error category for this error that allows the
> >>>>>>>> application
> >>>>>>>>> to
> >>>>>>>>>>>>> choose
> >>>>>>>>>>>>>>>>>> how it
> >>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you are
> >>>>>>>>> looking
> >>>>>>>>>>>> for?
> >>>>>>>>>>>>>>>> Stay
> >>>>>>>>>>>>>>>>>>>>> tuned :)
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <
> >>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments
> >>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation,
> >>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception
> >>>>>>>> handler
> >>>>>>>>>>> could
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing
> >>>>>>> topic,
> >>>>>>>>>> which
> >>>>>>>>>>>>>>>> returns a
> >>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the
> >>>>>>>> producer
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>> hang
> >>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break the
> >>>>>>>>>> infinite
> >>>>>>>>>>>>> retry
> >>>>>>>>>>>>>>>>>> loop.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between something
> >>>>>>>> that
> >>>>>>>>> is
> >>>>>>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent
> >> like
> >>>>>>>>> forgot
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>> create
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be
> >>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic creation
> >>>>>>>>> finish
> >>>>>>>>>>>>>>>> (specially
> >>>>>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC)
> >>>>>>>>>>>>>>>>>>>>>> temporary offline partitions
> >>>>>>>>>>>>>>>>>>>>>> leadership changing
> >>>>>>>>>>>>>>>>>>>>>>       Isn’t this putting the producer at risk of
> >>>>>>>> dropping
> >>>>>>>>>>>> records
> >>>>>>>>>>>>>>>>>>>>>> unintentionally?
> >>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the
> >>>>>>>>>>>> compatibility
> >>>>>>>>>>>>> /
> >>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit
> >>>>>>> more
> >>>>>>>>>>> details
> >>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client
> >>>>>>> who
> >>>>>>>>> are
> >>>>>>>>>>>>>>>> upgrading?
> >>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the
> >>>>>>> KIP
> >>>>>>>>>> where
> >>>>>>>>>>> in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and
> >>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how
> >>>>>>> will
> >>>>>>>>> the
> >>>>>>>>>>>>>>> producer
> >>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed implementation.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>>> Omnia
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi
> >>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to
> >>>>>>>>>> Producer.
> >>>>>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback!
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>> Alieh
>
>
>

Reply via email to