Hi Alieh, Thanks for the updates! I just have a few more thoughts:
- I don't think a boolean property is sufficient to dictate retries for unknown topic partitions, though. These errors can occur if a topic has just been created, which can occur if, for example, automatic topic creation is enabled for a multi-task connector. This is why I proposed a timeout instead of a boolean (and see my previous email for why reducing max.block.ms for a producer is not a viable alternative). If it helps, one way to reproduce this yourself is to add the line `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here: https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 and then check the logs afterward for messages like "Error while fetching metadata with correlation id <n> : {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". - I also don't think we need custom XxxResponse enums for every possible method; it seems like this will lead to a lot of duplication and cognitive overhead if we want to expand the error handler in the future. Something more flexible like RetriableResponse and NonRetriableResponse could suffice. - Finally, the KIP still doesn't state how the handler will or won't take precedence over existing retry properties. If I set `retries` or ` delivery.timeout.ms` or `max.block.ms` to low values, will that cause retries to cease even if my custom handler would otherwise keep returning RETRY for an error? Cheers, Chris On Mon, May 13, 2024 at 11:02 AM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi Alieh, > Just a few more comments on the KIP. It is looking much less risky now the > scope > is tighter. > > [AJS1] It would be nice to have default implementations of the handle > methods > so an implementor would not need to implement both themselves. > > [AJS2] Producer configurations which are class names usually end in > “.class”. > I suggest “custom.exception.handler.class”. > > [AJS3] If I implemented a handler, and I set a non-default value for one > of the > new configuations, what happens? I would expect that the handler takes > precedence. I wasn’t quite clear what “the control will follow the handler > instructions” meant. > > [AJS4] Because you now have an enum for the > RecordTooLargeExceptionResponse, > I don’t think you need to state in the comment for > ProducerExceptionHandler that > RETRY will be interpreted as FAIL. > > Thanks, > Andrew > > > On 13 May 2024, at 14:53, Alieh Saeedi <asae...@confluent.io.INVALID> > wrote: > > > > Hi all, > > > > > > Thanks for the very interesting discussion during my PTO. > > > > > > KIP updates and addressing concerns: > > > > > > 1) Two handle() methods are defined in ProducerExceptionHandler for the > two > > exceptions with different input parameters so that we have > > handle(RecordTooLargeException e, ProducerRecord record) and > > handle(UnknownTopicOrPartitionException e, ProducerRecord record) > > > > > > 2) The ProducerExceptionHandler extends `Closable` as well. > > > > > > 3) The KIP suggests having two more configuration parameters with boolean > > values: > > > > - `drop.invalid.large.records` with a default value of `false` for > > swallowing too large records. > > > > - `retry.unknown.topic.partition` with a default value of `true` that > > performs RETRY for `max.block.ms` ms, encountering the > > UnknownTopicOrPartitionException. > > > > > > Hope the main concerns are addressed so that we can go forward with > voting. > > > > > > Cheers, > > > > Alieh > > > > On Thu, May 9, 2024 at 11:25 PM Artem Livshits > > <alivsh...@confluent.io.invalid> wrote: > > > >> Hi Mathias, > >> > >>> [AL1] While I see the point, I would think having a different callback > >> for every exception might not really be elegant? > >> > >> I'm not sure how to assess the level of elegance of the proposal, but I > can > >> comment on the technical characteristics: > >> > >> 1. Having specific interfaces that codify the logic that is currently > >> prescribed in the comments reduce the chance of making a mistake. > >> Commments may get ignored, misuderstood or etc. but if the contract is > >> codified, the compilier will help to enforce the contract. > >> 2. Given that the logic is trickier than it seems (the record-too-large > is > >> an example that can easily confuse someone who's not intimately familiar > >> with the nuances of the batching logic), having a little more hoops to > jump > >> would give a greater chance that whoever tries to add a new cases pauses > >> and thinks a bit more. > >> 3. As Justine pointed out, having different method will be a forcing > >> function to go through a KIP rather than smuggle new cases through > >> implementation. > >> 4. Sort of a consequence of the previous 3 -- all those things reduce > the > >> chance of someone writing the code that works with 2 errors and then > when > >> more errors are added in the future will suddenly incorrectly ignore new > >> errors (the example I gave in the previous email). > >> > >>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as > >> business logic. If a user puts a bad filter condition in their KS app, > and > >> drops messages > >> > >> I agree that there is always a chance to get a bug and lose messages, > but > >> there are generally separation of concerns that has different risk > profile: > >> the filtering logic may be more rigorously tested and rarely changed > (say > >> an application developer does it), but setting the topics to produce > may be > >> done via configuration (e.g. a user of the application does it) and it's > >> generally an expectation that users would get an error when > configuration > >> is incorrect. > >> > >> What could be worse is that UnknownTopicOrPartitionException can be an > >> intermittent error, i.e. with a generally correct configuration, there > >> could be metadata propagation problem on the cluster and then a random > set > >> of records could get lost. > >> > >>> [AL3] Maybe I misunderstand what you are saying, but to me, checking > the > >> size of the record upfront is exactly what the KIP proposes? No? > >> > >> It achieves the same result but solves it differently, my proposal: > >> > >> 1. Application checks the validity of a record (maybe via a new > >> validateRecord method) before producing it, and can just exclude it or > >> return an error to the user. > >> 2. Application produces the record -- at this point there are no records > >> that could return record too large, they were either skipped at step 1 > or > >> we didn't get here because step 1 failed. > >> > >> Vs. KIP's proposal > >> > >> 1. Application produces the record. > >> 2. Application gets a callback. > >> 3. Application returns the action on how to proceed. > >> > >> The advantage of the former is the clarity of semantics -- the record is > >> invalid (property of the record, not a function of server state or > server > >> configuration) and we can clearly know that it is the record that is bad > >> and can never succeed. > >> > >> The KIP-proposed way actually has a very tricky point: it actually > handles > >> a subset of record-too-large exceptions. The broker can return > >> record-too-large and reject the whole batch (but we don't want to ignore > >> those because then we can skip random records that just happened to be > in > >> the same batch), in some sense we use the same error for 2 different > >> conditions and understanding that requires pretty deep understanding of > >> Kafka internals. > >> > >> -Artem > >> > >> > >> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > <jols...@confluent.io.invalid > >>> > >> wrote: > >> > >>> My concern with respect to it being fragile: the code that ensures the > >>> error type is internal to the producer. Someone may see it and say, I > >> want > >>> to add such and such error. This looks like internal code, so I don't > >> need > >>> a KIP, and then they can change it to whatever they want thinking it is > >>> within the typical kafka improvement protocol. > >>> > >>> Relying on an internal change to enforce an external API is fragile in > my > >>> opinion. That's why I sort of agreed with Artem with enforcing the > error > >> in > >>> the method signature -- part of the public API. > >>> > >>> Chris's comments on requiring more information to handler again makes > me > >>> wonder if we are solving a problem of lack of information at the > >>> application level with a more powerful solution than we need. (Ie, if > we > >>> had more information, could the application close and restart the > >>> transaction rather than having to drop records) But I am happy to > >>> compromise with a handler that we can agree is sufficiently controlled > >> and > >>> documented. > >>> > >>> Justine > >>> > >>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton <chr...@aiven.io.invalid> > >>> wrote: > >>> > >>>> Hi Alieh, > >>>> > >>>> Continuing prior discussions: > >>>> > >>>> 1) Regarding the "flexibility" discussion, my overarching point is > >> that I > >>>> don't see the point in allowing for this kind of pluggable logic > >> without > >>>> also covering more scenarios. Take example 2 in the KIP: if we're > going > >>> to > >>>> implement retries only on "important" topics when a topic partition > >> isn't > >>>> found, why wouldn't we also want to be able to do this for other > >> errors? > >>>> Again, taking authorization errors as an example, why wouldn't we want > >> to > >>>> be able to fail when we can't write to "important" topics because the > >>>> producer principal lacks sufficient ACLs, and drop the record if the > >>> topic > >>>> isn't "important"? In a security-conscious environment with > >>>> runtime-dependent topic routing (which is a common feature of many > >> source > >>>> connectors, such as the Debezium connectors), this seems fairly > likely. > >>>> > >>>> 2) As far as changing the shape of the API goes, I like Artem's idea > of > >>>> splitting out the interface based on specific exceptions. This may be > a > >>>> little laborious to expand in the future, but if we really want to > >>>> limit the exceptions that we cover with the handler and move slowly > and > >>>> cautiously, then IMO it'd be reasonable to reflect that in the > >>> interface. I > >>>> also acknowledge that there's no way to completely prevent people from > >>>> shooting themselves in the foot by implementing the API incorrectly, > >> but > >>> I > >>>> think it's worth it to do what we can--including leveraging the Java > >>>> language's type system--to help them, so IMO there's value to > >> eliminating > >>>> the implicit behavior of failing when a policy returns RETRY for a > >>>> non-retriable error. This can take a variety of shapes and I'm not > >> going > >>> to > >>>> insist on anything specific, but I do want to again raise my concerns > >>> with > >>>> the current proposal and request that we find something a little > >> better. > >>>> > >>>> 3) Concerning the default implementation--actually, I meant what I > >> wrote > >>> :) > >>>> I don't want a "second" default, I want an implementation of this > >>> interface > >>>> to be used as the default if no others are specified. The behavior of > >>> this > >>>> default implementation would be identical to existing behavior (so > >> there > >>>> would be no backwards compatibility concerns like the ones raised by > >>>> Matthias), but it would be possible to configure this default handler > >>> class > >>>> to behave differently for a basic set of scenarios. This would mirror > >>> (pun > >>>> intended) the approach we've taken with Mirror Maker 2 and its > >>>> ReplicationPolicy interface [1]. There is a default implementation > >>>> available [2] that recognizes a handful of basic configuration > >> properties > >>>> [3] for simple tweaks, but if users want, they can also implement > their > >>> own > >>>> replication policy for more fine-grained logic if those properties > >> aren't > >>>> flexible enough. > >>>> > >>>> More concretely, I'm imagining something like this for the producer > >>>> exception handler: > >>>> > >>>> - Default implementation class > >>>> of org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > >>>> - This class would recognize two properties: > >>>> - drop.invalid.large.records: Boolean property, defaults to false. If > >>>> "false", then causes the handler to return FAIL whenever > >>>> a RecordTooLargeException is encountered; if "true", then causes > >>>> SWALLOW/SKIP/DROP to be returned instead > >>>> - unknown.topic.partition.retry.timeout.ms: Integer property, > >> defaults > >>>> to > >>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is encountered, > >>>> causes the handler to return FAIL if that record has been pending for > >>> more > >>>> than the retry timeout; otherwise, causes RETRY to be returned > >>>> > >>>> I think this is worth addressing now instead of later because it > forces > >>> us > >>>> to evaluate the usefulness of this interface and it addresses a > >>>> long-standing issue not just with Kafka Connect, but with the Java > >>> producer > >>>> in general. For reference, here are a few tickets I collected after > >>> briefly > >>>> skimming our Jira showing that this is a real pain point for users: > >>>> https://issues.apache.org/jira/browse/KAFKA-10340, > >>>> https://issues.apache.org/jira/browse/KAFKA-12990, > >>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although this is > >>>> frequently reported with Kafka Connect, it applies to anyone who > >>> configures > >>>> a producer to use a high retry timeout. I am aware of the > max.block.ms > >>>> property, but it's painful and IMO poor behavior to require users to > >>> reduce > >>>> the value of this property just to handle the single scenario when > >> trying > >>>> to write to topics that don't exist, since it would also limit the > >> retry > >>>> timeout for other operations that are legitimately retriable. > >>>> > >>>> Raising new points: > >>>> > >>>> 5) I don't see the interplay between this handler and existing > >>>> retry-related properties mentioned anywhere in the KIP. I'm assuming > >> that > >>>> properties like "retries", "max.block.ms", and "delivery.timeout.ms" > >>> would > >>>> take precedence over the handler and once they are exhausted, the > >>>> record/batch will fail no matter what? If so, it's probably worth > >> briefly > >>>> mentioning this (no more than a sentence or two) in the KIP, and if > >> not, > >>>> I'm curious what you have in mind. > >>>> > >>>> 6) I also wonder if the API provides enough information in its current > >>>> form. Would it be possible to provide handlers with some way of > >> tracking > >>>> how long a record has been pending for (i.e., how long it's been since > >>> the > >>>> record was provided as an argument to Producer::send)? One way to do > >> this > >>>> could be to add a method like `onNewRecord(ProducerRecord)` and > >>>> allow/require the handler to do its own bookkeeping, probably with a > >>>> matching `onRecordSuccess(ProducerRecord)` method so that the handler > >>>> doesn't eat up an ever-increasing amount of memory trying to track > >>> records. > >>>> An alternative could be to include information about the initial time > >> the > >>>> record was received by the producer and the number of retries that > have > >>>> been performed for it as parameters in the handle method(s), but I'm > >> not > >>>> sure how easy this would be to implement and it might clutter things > >> up a > >>>> bit too much. > >>>> > >>>> 7) A small request--can we add Closeable (or, if you prefer, > >>> AutoCloseable) > >>>> as a superinterface for the handler interface? > >>>> > >>>> [1] - > >>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > >>>> [2] - > >>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > >>>> [3] - > >>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > >>>> , > >>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > >>>> > >>>> Cheers, > >>>> > >>>> Chris > >>>> > >>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax <mj...@apache.org> > >>> wrote: > >>>> > >>>>> Very interesting discussion. > >>>>> > >>>>> Seems a central point is the question about "how generic" we approach > >>>>> this, and some people think we need to be conservative and others > >> think > >>>>> we should try to be as generic as possible. > >>>>> > >>>>> Personally, I think following a limited scope for this KIP (by > >>>>> explicitly saying we only cover RecordTooLarge and > >>>>> UnknownTopicOrPartition) might be better. We have concrete tickets > >> that > >>>>> we address, while for other exception (like authorization) we don't > >>> know > >>>>> if people want to handle it to begin with. Boiling the ocean might > >> not > >>>>> get us too far, and being somewhat pragmatic might help to move this > >>> KIP > >>>>> forward. -- I also agree with Justin and Artem, that we want to be > >>>>> careful anyway to not allow users to break stuff too easily. > >>>>> > >>>>> As the same time, I agree that we should setup this change in a > >> forward > >>>>> looking way, and thus having a single generic handler allows us to > >>> later > >>>>> extend the handler more easily. This should also simplify follow up > >> KIP > >>>>> that might add new error cases (I actually mentioned one more to > >> Alieh > >>>>> already, but we both agreed that it might be best to exclude it from > >>> the > >>>>> KIP right now, to make the 3.8 deadline. Doing a follow up KIP is not > >>>>> the end of the world.) > >>>>> > >>>>> > >>>>> > >>>>> @Chris: > >>>>> > >>>>> (2) This sounds fair to me, but not sure how "bad" it actually would > >>> be? > >>>>> If the contract is clearly defined, it seems to be fine what the KIP > >>>>> proposes, and given that such a handler is an expert API, and we can > >>>>> provide "best practices" (cf my other comment below in [AL1]), being > >> a > >>>>> little bit pragmatic sound fine to me. > >>>>> > >>>>> Not sure if I understand Justin's argument on this question? > >>>>> > >>>>> > >>>>> (3) About having a default impl or not. I am fine with adding one, > >> even > >>>>> if I am not sure why Connect could not just add its own one and plug > >> it > >>>>> in (and we would add corresponding configs for Connect, but not for > >> the > >>>>> producer itself)? For this case, we could also do this as a follow up > >>>>> KIP, but happy to include it in this KIP to provide value to Connect > >>>>> right away (even if the value might not come right away if we miss > >> the > >>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we would for > >>> sure > >>>>> plugin our own impl, and lock down the config such that users cannot > >>> set > >>>>> their own handler on the internal producer to begin with. Might be > >> good > >>>>> to elaborate why the producer should have a default? We might > >> actually > >>>>> want to add this to the KIP right away? > >>>>> > >>>>> The key for a default impl would be, to not change the current > >>> behavior, > >>>>> and having no default seems to achieve this. For the two cases you > >>>>> mentioned, it's unclear to me what default value on "upper bound on > >>>>> retires" for UnkownTopicOrPartitionException we should set? Seems it > >>>>> would need to be the same as `delivery.timeout.ms`? However, if > >> users > >>>>> have `delivery.timeout.ms` actually overwritten we would need to set > >>>>> this config somewhat "dynamic"? Is this feasible? If we hard-code 2 > >>>>> minutes, it might not be backward compatible. I have the impression > >> we > >>>>> might introduce some undesired coupling? -- For the "record too > >> large" > >>>>> case, the config seems to be boolean and setting it to `false` by > >>>>> default seems to provide backward compatibility. > >>>>> > >>>>> > >>>>> > >>>>> @Artem: > >>>>> > >>>>> [AL1] While I see the point, I would think having a different > >> callback > >>>>> for every exception might not really be elegant? In the end, the > >>> handler > >>>>> is an very advanced feature anyway, and if it's implemented in a bad > >>>>> way, well, it's a user error -- we cannot protect users from > >>> everything. > >>>>> To me, a handler like this, is to some extend "business logic" and > >> if a > >>>>> user gets business logic wrong, it's hard to protect them. -- We > >> would > >>>>> of course provide best practice guidance in the JaveDocs, and explain > >>>>> that a handler should have explicit `if` statements for stuff it want > >>> to > >>>>> handle, and only a single default which return FAIL. > >>>>> > >>>>> > >>>>> [AL2] Yes, but for KS we would retry at the application layer. Ie, > >> the > >>>>> TX is not completed, we clean up and setup out task from scratch, to > >>>>> ensure the pending transaction is completed before we resume. If the > >> TX > >>>>> was indeed aborted, we would retry from older offset and thus just > >> hit > >>>>> the same error again and the loop begins again. > >>>>> > >>>>> > >>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend as > >>>>> business logic. If a user puts a bad filter condition in their KS > >> app, > >>>>> and drops messages, it nothing we can do about it, and this handler > >>>>> IMHO, has a similar purpose. This is also the line of thinking I > >> apply > >>>>> to EOS, to address Justin's concern about "should we allow to drop > >> for > >>>>> EOS", and my answer is "yes", because it's more business logic than > >>>>> actual error handling IMHO. And by default, we fail... So users > >> opt-in > >>>>> to add business logic to drop records. It's an application level > >>>>> decision how to write the code. > >>>>> > >>>>> > >>>>> [AL3] Maybe I misunderstand what you are saying, but to me, checking > >>> the > >>>>> size of the record upfront is exactly what the KIP proposes? No? > >>>>> > >>>>> > >>>>> > >>>>> @Justin: > >>>>> > >>>>>> I saw the sample > >>>>>> code -- is it just an if statement checking for the error before > >> the > >>>>>> handler is invoked? That seems a bit fragile. > >>>>> > >>>>> What do you mean by fragile? Not sure if I see your point. > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -Matthias > >>>>> > >>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > >>>>>> Hi Alieh, > >>>>>> > >>>>>> Thanks for the KIP. The motivation talks about very specific > >> cases, > >>>> but > >>>>>> the interface is generic. > >>>>>> > >>>>>> [AL1] > >>>>>> If the interface evolves in the future I think we could have the > >>>>> following > >>>>>> confusion: > >>>>>> > >>>>>> 1. A user implemented SWALLOW action for both > >> RecordTooLargeException > >>>> and > >>>>>> UnknownTopicOrPartitionException. For simpicity they just return > >>>> SWALLOW > >>>>>> from the function, because it elegantly handles all known cases. > >>>>>> 2. The interface has evolved to support a new exception. > >>>>>> 3. The user has upgraded their Kafka client. > >>>>>> > >>>>>> Now a new kind of error gets dropped on the floor without user's > >>>>> intention > >>>>>> and it would be super hard to detect and debug. > >>>>>> > >>>>>> To avoid the confusion, I think we should use handlers for specific > >>>>>> exceptions. Then if a new exception is added it won't get silently > >>>>> swalled > >>>>>> because the user would need to add new functionality to handle it. > >>>>>> > >>>>>> I also have some higher level comments: > >>>>>> > >>>>>> [AL2] > >>>>>>> it throws a TimeoutException, and the user can only blindly retry, > >>>> which > >>>>>> may result in an infinite retry loop > >>>>>> > >>>>>> If the TimeoutException happens during transactional processing > >>>> (exactly > >>>>>> once is the desired sematnics), then the client should not retry > >> when > >>>> it > >>>>>> gets TimeoutException because without knowing the reason for > >>>>>> TimeoutExceptions, the client cannot know whether the message got > >>>>> actually > >>>>>> produced or not and retrying the message may result in duplicatees. > >>>>>> > >>>>>>> The thrown TimeoutException "cuts" the connection to the > >> underlying > >>>> root > >>>>>> cause of missing metadata > >>>>>> > >>>>>> Maybe we should fix the error handling and return the proper > >>> underlying > >>>>>> message? Then the application can properly handle the message > >> based > >>> on > >>>>>> preferences. > >>>>>> > >>>>>> From the product perspective, it's not clear how safe it is to > >>> blindly > >>>>>> ignore UnknownTopicOrPartitionException. This could lead to > >>> situations > >>>>>> when a simple typo could lead to massive data loss (part of the > >> data > >>>>> would > >>>>>> effectively be produced to a "black hole" and the user may not > >> notice > >>>> it > >>>>>> for a while). > >>>>>> > >>>>>> In which situations would you recommend the user to "black hole" > >>>> messages > >>>>>> in case of misconfiguration? > >>>>>> > >>>>>> [AL3] > >>>>>> > >>>>>>> If the custom handler decides on SWALLOW for > >>> RecordTooLargeException, > >>>>>> > >>>>>> Is it my understanding that this KIP proposes that functionality > >> that > >>>>> would > >>>>>> only be able to SWALLOW RecordTooLargeException that happen because > >>> the > >>>>>> producer cannot produce the record (if the broker rejects the > >> batch, > >>>> the > >>>>>> error won't get to the handler, because we cannot know which other > >>>>> records > >>>>>> get ignored). In this case, why not just check the locally > >>> configured > >>>>> max > >>>>>> record size upfront and not produce the recrord in the first place? > >>>>> Maybe > >>>>>> we can expose a validation function from the producer that could > >>>> validate > >>>>>> the records locally, so we don't need to produce the record in > >> order > >>> to > >>>>>> know that it's invalid. > >>>>>> > >>>>>> -Artem > >>>>>> > >>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > >>>>> <jols...@confluent.io.invalid> > >>>>>> wrote: > >>>>>> > >>>>>>> Alieh and Chris, > >>>>>>> > >>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess I just > >>>> didn't > >>>>>>> understand how that would be ensured on the producer side. I saw > >> the > >>>>> sample > >>>>>>> code -- is it just an if statement checking for the error before > >> the > >>>>>>> handler is invoked? That seems a bit fragile. > >>>>>>> > >>>>>>> Can you clarify what you mean by `since the code does not reach > >> the > >>> KS > >>>>>>> interface and breaks somewhere in producer.` If we surfaced this > >>> error > >>>>> to > >>>>>>> the application in a better way would that also be a solution to > >> the > >>>>> issue? > >>>>>>> > >>>>>>> Justine > >>>>>>> > >>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > >>>>> <asae...@confluent.io.invalid> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> > >>>>>>>> Thank you, Chris and Justine, for the feedback. > >>>>>>>> > >>>>>>>> > >>>>>>>> @Chris > >>>>>>>> > >>>>>>>> 1) Flexibility: it has two meanings. The first meaning is the one > >>> you > >>>>>>>> mentioned. We are going to cover more exceptions in the future, > >> but > >>>> as > >>>>>>>> Justine mentioned, we must be very conservative about adding more > >>>>>>>> exceptions. Additionally, flexibility mainly means that the user > >> is > >>>>> able > >>>>>>> to > >>>>>>>> develop their own code. As mentioned in the motivation section > >> and > >>>> the > >>>>>>>> examples, sometimes the user decides on dropping a record based > >> on > >>>> the > >>>>>>>> topic, for example. > >>>>>>>> > >>>>>>>> > >>>>>>>> 2) Defining two separate methods for retriable and non-retriable > >>>>>>>> exceptions: although the idea is brilliant, the user may still > >>> make a > >>>>>>>> mistake by implementing the wrong method and see a non-expecting > >>>>>>> behaviour. > >>>>>>>> For example, he may implement handleRetriable() for > >>>>>>> RecordTooLargeException > >>>>>>>> and define SWALLOW for the exception, but in practice, he sees no > >>>>> change > >>>>>>> in > >>>>>>>> default behaviour since he implemented the wrong method. I think > >> we > >>>> can > >>>>>>>> never reduce the user’s mistakes to 0. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> 3) Default implementation for Handler: the default behaviour is > >>>> already > >>>>>>>> preserved with NO need of implementing any handler or setting the > >>>>>>>> corresponding config parameter `custom.exception.handler`. What > >> you > >>>>> mean > >>>>>>> is > >>>>>>>> actually having a second default, which requires having both > >>>> interface > >>>>>>> and > >>>>>>>> config parameters. About UnknownTopicOrPartitionException: the > >>>> producer > >>>>>>>> already offers the config parameter `max.block.ms` which > >>> determines > >>>>> the > >>>>>>>> duration of retrying. The main purpose of the user who needs the > >>>>> handler > >>>>>>> is > >>>>>>>> to get the root cause of TimeoutException and handle it in the > >> way > >>> he > >>>>>>>> intends. The KIP explains the necessity of it for KS users. > >>>>>>>> > >>>>>>>> > >>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the error, > >>>> while > >>>>>>>> SKIP means skip the record, I think. If it makes sense for more > >>> ppl, > >>>> I > >>>>>>> can > >>>>>>>> change it to SKIP > >>>>>>>> > >>>>>>>> > >>>>>>>> @Justine > >>>>>>>> > >>>>>>>> 1) was addressed by Chris. > >>>>>>>> > >>>>>>>> 2 and 3) The problem is exactly what you mentioned. Currently, > >>> there > >>>> is > >>>>>>> no > >>>>>>>> way to handle these issues application-side. Even KS users who > >>>>> implement > >>>>>>> KS > >>>>>>>> ProductionExceptionHandler are not able to handle the exceptions > >> as > >>>>> they > >>>>>>>> intend since the code does not reach the KS interface and breaks > >>>>>>> somewhere > >>>>>>>> in producer. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Alieh > >>>>>>>> > >>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < > >>>> fearthecel...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Justine, > >>>>>>>>> > >>>>>>>>> The method signatures for the interface are indeed open-ended, > >> but > >>>> the > >>>>>>>> KIP > >>>>>>>>> states that its uses will be limited. See the motivation > >> section: > >>>>>>>>> > >>>>>>>>>> We believe that the user should be able to develop custom > >>> exception > >>>>>>>>> handlers for managing producer exceptions. On the other hand, > >> this > >>>>> will > >>>>>>>> be > >>>>>>>>> an expert-level API, and using that may result in strange > >>> behaviour > >>>> in > >>>>>>>> the > >>>>>>>>> system, making it hard to find the root cause. Therefore, the > >>> custom > >>>>>>>>> handler is currently limited to handling RecordTooLargeException > >>> and > >>>>>>>>> UnknownTopicOrPartitionException. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Chris > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan > >>>>> <jols...@confluent.io.invalid > >>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Alieh, > >>>>>>>>>> > >>>>>>>>>> I was out for KSB and then was also sick. :( > >>>>>>>>>> > >>>>>>>>>> To your point 1) Chris, I don't think it is limited to two > >>> specific > >>>>>>>>>> scenarios, since the interface accepts a generic Exception e > >> and > >>>> can > >>>>>>> be > >>>>>>>>>> implemented to check if that e is an instanceof any exception. > >> I > >>>>>>> didn't > >>>>>>>>> see > >>>>>>>>>> anywhere that specific errors are enforced. I'm a bit concerned > >>>> about > >>>>>>>>> this > >>>>>>>>>> actually. I'm concerned about the opened-endedness and the > >>> contract > >>>>>>> we > >>>>>>>>> have > >>>>>>>>>> with transactions. We are allowing the client to make decisions > >>>> that > >>>>>>>> are > >>>>>>>>>> somewhat invisible to the server. As an aside, can we build in > >>> log > >>>>>>>>> messages > >>>>>>>>>> when the handler decides to skip etc a message. I'm really > >>>> concerned > >>>>>>>>> about > >>>>>>>>>> messages being silently dropped. > >>>>>>>>>> > >>>>>>>>>> I do think Chris's point 2) about retriable vs non retriable > >>> errors > >>>>>>> is > >>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic or > >>>> partition > >>>>>>>>>> exception too early, as there are cases where it can be > >>> transient. > >>>>>>>>>> > >>>>>>>>>> I'm still a little bit wary of allowing dropping records as > >> part > >>> of > >>>>>>> EOS > >>>>>>>>>> generally as in many cases, these errors signify an issue with > >>> the > >>>>>>>>> original > >>>>>>>>>> data. I understand that streams and connect/mirror maker may > >> have > >>>>>>>> reasons > >>>>>>>>>> they want to progress past these messages, but wondering if > >> there > >>>> is > >>>>>>> a > >>>>>>>>> way > >>>>>>>>>> that can be done application-side. I'm willing to accept this > >>> sort > >>>> of > >>>>>>>>>> proposal if we can make it clear that this sort of thing is > >>>> happening > >>>>>>>> and > >>>>>>>>>> we limit the blast radius for what we can do. > >>>>>>>>>> > >>>>>>>>>> Justine > >>>>>>>>>> > >>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton > >>>> <chr...@aiven.io.invalid > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Alieh, > >>>>>>>>>>> > >>>>>>>>>>> Sorry for the delay, I've been out sick. I still have some > >>>> thoughts > >>>>>>>>> that > >>>>>>>>>>> I'd like to see addressed before voting. > >>>>>>>>>>> > >>>>>>>>>>> 1) If flexibility is the motivation for a pluggable interface, > >>> why > >>>>>>>> are > >>>>>>>>> we > >>>>>>>>>>> only limiting the uses for this interface to two very specific > >>>>>>>>> scenarios? > >>>>>>>>>>> Why not also allow, e.g., authorization errors to be handled > >> as > >>>>>>> well > >>>>>>>>>>> (allowing users to drop records destined for some off-limits > >>>>>>> topics, > >>>>>>>> or > >>>>>>>>>>> retry for a limited duration in case there's a delay in the > >>>>>>>> propagation > >>>>>>>>>> of > >>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of other > >> errors > >>>>>>> that > >>>>>>>>>> could > >>>>>>>>>>> be handled with this new API, both to avoid the follow-up work > >>> of > >>>>>>>>> another > >>>>>>>>>>> KIP to address them in the future, and to make sure that we're > >>> not > >>>>>>>>>> painting > >>>>>>>>>>> ourselves into a corner with the current API in a way that > >> would > >>>>>>> make > >>>>>>>>>>> future modifications difficult. > >>>>>>>>>>> > >>>>>>>>>>> 2) Something feels a bit off with how retriable vs. > >>> non-retriable > >>>>>>>>> errors > >>>>>>>>>>> are handled with the interface. Why not introduce two separate > >>>>>>>> methods > >>>>>>>>> to > >>>>>>>>>>> handle each case separately? That way there's no ambiguity or > >>>>>>>> implicit > >>>>>>>>>>> behavior when, e.g., attempting to retry on a > >>>>>>>> RecordTooLargeException. > >>>>>>>>>> This > >>>>>>>>>>> could be something like `NonRetriableResponse > >>>>>>> handle(ProducerRecord, > >>>>>>>>>>> Exception)` and `RetriableResponse > >>> handleRetriable(ProducerRecord, > >>>>>>>>>>> Exception)`, though the exact names and shape can obviously be > >>>>>>> toyed > >>>>>>>>>> with a > >>>>>>>>>>> bit. > >>>>>>>>>>> > >>>>>>>>>>> 3) Although the flexibility of a pluggable interface may > >> benefit > >>>>>>> some > >>>>>>>>>>> users' custom producer applications and Kafka Streams > >>>> applications, > >>>>>>>> it > >>>>>>>>>>> comes at significant deployment cost for other low-/no-code > >>>>>>>>> environments, > >>>>>>>>>>> including but not limited to Kafka Connect and MirrorMaker 2. > >>> Can > >>>>>>> we > >>>>>>>>> add > >>>>>>>>>> a > >>>>>>>>>>> default implementation of the exception handler that allows > >> for > >>>>>>> some > >>>>>>>>>> simple > >>>>>>>>>>> behavior to be tweaked via configuration property? Two things > >>> that > >>>>>>>>> would > >>>>>>>>>> be > >>>>>>>>>>> nice to have would be A) an upper bound on the retry time for > >>>>>>>>>>> unknown-topic-partition exceptions and B) an option to drop > >>>> records > >>>>>>>>> that > >>>>>>>>>>> are large enough to trigger a record-too-large exception. > >>>>>>>>>>> > >>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of the > >>>> proposed > >>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, > >>> especially > >>>>>>>> when > >>>>>>>>>>> trying to guess the behavior for retriable errors. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> > >>>>>>>>>>> Chris > >>>>>>>>>>> > >>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > >>>>>>>>>> <asae...@confluent.io.invalid > >>>>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi all, > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> A summary of the KIP and the discussions: > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> The KIP introduces a handler interface for Producer in order > >> to > >>>>>>>>> handle > >>>>>>>>>>> two > >>>>>>>>>>>> exceptions: RecordTooLargeException and > >>>>>>>>>> UnknownTopicOrPartitionException. > >>>>>>>>>>>> The handler handles the exceptions per-record. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> - Do we need this handler? [Motivation and Examples > >> sections] > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the producer > >>>>>>> collects > >>>>>>>>>>> multiple > >>>>>>>>>>>> records in batches. Then a RecordTooLargeException related > >> to a > >>>>>>>>> single > >>>>>>>>>>>> record leads to failing the entire batch. A custom exception > >>>>>>>> handler > >>>>>>>>> in > >>>>>>>>>>>> this case may decide on dropping the record and continuing > >> the > >>>>>>>>>>> processing. > >>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka Streams, a > >> record > >>>>>>>> that > >>>>>>>>> is > >>>>>>>>>>> too > >>>>>>>>>>>> large is a poison pill record, and there is no way to skip > >> over > >>>>>>>> it. A > >>>>>>>>>>>> handler would allow us to react to this error inside the > >>>>>>> producer, > >>>>>>>>>> i.e., > >>>>>>>>>>>> local to where the error happens, and thus simplify the > >> overall > >>>>>>>> code > >>>>>>>>>>>> significantly. Please read the Motivation section for more > >>>>>>>>> explanation. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the producer > >>>>>>>> handles > >>>>>>>>>>> this > >>>>>>>>>>>> exception internally and only issues a WARN log about missing > >>>>>>>>> metadata > >>>>>>>>>>> and > >>>>>>>>>>>> retries internally. Later, when the producer hits " > >>>>>>>>> deliver.timeout.ms" > >>>>>>>>>>> it > >>>>>>>>>>>> throws a TimeoutException, and the user can only blindly > >> retry, > >>>>>>>> which > >>>>>>>>>> may > >>>>>>>>>>>> result in an infinite retry loop. The thrown TimeoutException > >>>>>>>> "cuts" > >>>>>>>>>> the > >>>>>>>>>>>> connection to the underlying root cause of missing metadata > >>>>>>> (which > >>>>>>>>>> could > >>>>>>>>>>>> indeed be a transient error but is persistent for a > >>> non-existing > >>>>>>>>>> topic). > >>>>>>>>>>>> Thus, there is no programmatic way to break the infinite > >> retry > >>>>>>>> loop. > >>>>>>>>>>> Kafka > >>>>>>>>>>>> Streams also blindly retries for this case, and the > >> application > >>>>>>>> gets > >>>>>>>>>>> stuck. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> - Having interface vs configuration option: [Motivation, > >>>>>>> Examples, > >>>>>>>>> and > >>>>>>>>>>>> Rejected Alternatives sections] > >>>>>>>>>>>> > >>>>>>>>>>>> Our solution is introducing an interface due to the full > >>>>>>>> flexibility > >>>>>>>>>> that > >>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams ones, > >>>>>>>> determine > >>>>>>>>>> the > >>>>>>>>>>>> handler's behaviour based on the situation. For example, f > >>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may want > >> to > >>>>>>>> raise > >>>>>>>>> an > >>>>>>>>>>>> error for some topics but retry it for other topics. Having a > >>>>>>>>>>> configuration > >>>>>>>>>>>> option with a fixed set of possibilities does not serve the > >>>>>>> user's > >>>>>>>>>>>> needs. See Example 2, please. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces > >> section] > >>>>>>>>>>>> > >>>>>>>>>>>> If the custom handler decides on SWALLOW for > >>>>>>>> RecordTooLargeException, > >>>>>>>>>>> then > >>>>>>>>>>>> this record will not be a part of the batch of transactions > >> and > >>>>>>>> will > >>>>>>>>>> also > >>>>>>>>>>>> not be sent to the broker in non-transactional mode. So no > >>>>>>> worries > >>>>>>>>>> about > >>>>>>>>>>>> getting a RecordTooLargeException from the broker in this > >> case, > >>>>>>> as > >>>>>>>>> the > >>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW means > >>> drop > >>>>>>>> the > >>>>>>>>>>> record > >>>>>>>>>>>> and continue/swallow the error. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> - What if the handle() method implements RETRY for > >>>>>>>>>>> RecordTooLargeException? > >>>>>>>>>>>> [Proposed Changes section] > >>>>>>>>>>>> > >>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW for > >>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal to > >> FAIL. > >>>>>>>> This > >>>>>>>>> is > >>>>>>>>>>>> well documented/informed in javadoc. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> - What if the handle() method of the handler throws an > >>> exception? > >>>>>>>>>>>> > >>>>>>>>>>>> The handler is expected to have correct code. If it throws an > >>>>>>>>>> exception, > >>>>>>>>>>>> everything fails. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> This is a PoC PR <https://github.com/apache/kafka/pull/15846 > >>> > >>>>>>> ONLY > >>>>>>>>> for > >>>>>>>>>>>> RecordTooLargeException. The code changes related to > >>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this PR > >>> LATER. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Looking forward to your feedback again. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> > >>>>>>>>>>>> Alieh > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < > >> k...@kirktrue.pro> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the updates! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Comments inline... > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > >>>>>>>>>>> <asae...@confluent.io.INVALID > >>>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Addressing some of the main concerns: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by broker, > >>>>>>>> producer > >>>>>>>>>> and > >>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` > >> interface > >>>>>>>> is > >>>>>>>>>>>>> introduced > >>>>>>>>>>>>>> to affect only the exceptions thrown from the producer. > >> This > >>>>>>>> KIP > >>>>>>>>>> very > >>>>>>>>>>>>>> specifically means to provide a possibility to manage the > >>>>>>>>>>>>>> `RecordTooLargeException` thrown from the Producer.send() > >>>>>>>> method. > >>>>>>>>>>>> Please > >>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I > >>>>>>> investigated > >>>>>>>>> the > >>>>>>>>>>>> issue > >>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern about > >> how > >>>>>>>> we > >>>>>>>>>>> handle > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> errors as well. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are called > >>>>>>>> when > >>>>>>>>>> the > >>>>>>>>>>>>> record > >>>>>>>>>>>>>> sent to the server is acknowledged, while this is not the > >>>>>>>> desired > >>>>>>>>>>> time > >>>>>>>>>>>>> for > >>>>>>>>>>>>>> all exceptions. We intend to handle exceptions beforehand. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I guess it makes sense to keep the expectation for when > >>>>>>> Callback > >>>>>>>> is > >>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> - What if the custom handler returns RETRY for > >>>>>>>>>>>>> `RecordTooLargeException`? I > >>>>>>>>>>>>>> assume changing the producer configuration at runtime is > >>>>>>>>> possible. > >>>>>>>>>> If > >>>>>>>>>>>> so, > >>>>>>>>>>>>>> RETRY for a too large record is valid because maybe in the > >>>>>>> next > >>>>>>>>>> try, > >>>>>>>>>>>> the > >>>>>>>>>>>>>> too large record is not poisoning any more. I am not 100% > >>>>>>> sure > >>>>>>>>>> about > >>>>>>>>>>>> the > >>>>>>>>>>>>>> technical details, though. Otherwise, we can consider the > >>>>>>> RETRY > >>>>>>>>> as > >>>>>>>>>>> FAIL > >>>>>>>>>>>>> for > >>>>>>>>>>>>>> this exception. Another solution would be to consider a > >>>>>>>> constant > >>>>>>>>>>> number > >>>>>>>>>>>>> of > >>>>>>>>>>>>>> times for RETRY which can be useful for other exceptions as > >>>>>>>> well. > >>>>>>>>>>>>> > >>>>>>>>>>>>> It’s not presently possible to change the configuration of > >> an > >>>>>>>>>> existing > >>>>>>>>>>>>> Producer at runtime. So if a record hits a > >>>>>>>> RecordTooLargeException > >>>>>>>>>>> once, > >>>>>>>>>>>> no > >>>>>>>>>>>>> amount of retrying (with the current Producer) will change > >>> that > >>>>>>>>> fact. > >>>>>>>>>>> So > >>>>>>>>>>>>> I’m still a little stuck on how to handle a response of > >> RETRY > >>>>>>> for > >>>>>>>>> an > >>>>>>>>>>>>> “oversized” record. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> - What if the handle() method itself throws an exception? I > >>>>>>>> think > >>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be exactly > >>>>>>>> like > >>>>>>>>>> when > >>>>>>>>>>>> no > >>>>>>>>>>>>>> custom handler is defined since the user actually did not > >>>>>>> have > >>>>>>>> a > >>>>>>>>>>>> working > >>>>>>>>>>>>>> handler. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is the > >> right > >>>>>>>>>> choice. > >>>>>>>>>>> It > >>>>>>>>>>>>> then becomes a silent failure that might have repercussions, > >>>>>>>>>> depending > >>>>>>>>>>> on > >>>>>>>>>>>>> the business logic. A user would have to proactively trawls > >>>>>>>> through > >>>>>>>>>> the > >>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… > >>>>>>>>>>>>> > >>>>>>>>>>>>>> - Why not use config parameters instead of an interface? As > >>>>>>>>>> explained > >>>>>>>>>>>> in > >>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that the > >>>>>>> handler > >>>>>>>>>> will > >>>>>>>>>>> be > >>>>>>>>>>>>>> used for a greater number of exceptions in the future. > >>>>>>>> Defining a > >>>>>>>>>>>>>> configuration parameter for each exception may make the > >>>>>>>>>>> configuration a > >>>>>>>>>>>>> bit > >>>>>>>>>>>>>> messy. Moreover, the handler offers more flexibility. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Agreed that the logic-via-configuration approach is weird > >> and > >>>>>>>>>> limiting. > >>>>>>>>>>>>> Forget I ever suggested it ;) > >>>>>>>>>>>>> > >>>>>>>>>>>>> I’d think additional background in the Motivation section > >>> would > >>>>>>>>> help > >>>>>>>>>> me > >>>>>>>>>>>>> understand how users might use this feature beyond a) > >> skipping > >>>>>>>>>>>> “oversized” > >>>>>>>>>>>>> records, and b) not retrying missing topics. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Small change: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for brevity > >>>>>>> and > >>>>>>>>>>>>> simplicity. > >>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! > >>>>>>>>>>>>> > >>>>>>>>>>>>> The name change sounds good to me. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks Alieh! > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I thank you all again for your useful > >> questions/suggestions. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I would be happy to hear more of your concerns, as stated > >> in > >>>>>>>> some > >>>>>>>>>>>>> feedback. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks Alieh for the updates. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm a little concerned about the design pattern here. It > >>>>>>> seems > >>>>>>>>>> like > >>>>>>>>>>> we > >>>>>>>>>>>>> want > >>>>>>>>>>>>>>> specific usages, but we are packaging it as a generic > >>>>>>> handler. > >>>>>>>>>>>>>>> I think we tried to narrow down on the specific errors we > >>>>>>> want > >>>>>>>>> to > >>>>>>>>>>>>> handle, > >>>>>>>>>>>>>>> but it feels a little clunky as we have a generic thing > >> for > >>>>>>>> two > >>>>>>>>>>>> specific > >>>>>>>>>>>>>>> errors. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to solve > >>>>>>>> these > >>>>>>>>>>>>> problems. I > >>>>>>>>>>>>>>> agree though that we will need something more than the > >> error > >>>>>>>>>> classes > >>>>>>>>>>>> I'm > >>>>>>>>>>>>>>> proposing if we want to have different handling be > >>>>>>>> configurable. > >>>>>>>>>>>>>>> My concern is that the open-endedness of a handler means > >>>>>>> that > >>>>>>>> we > >>>>>>>>>> are > >>>>>>>>>>>>>>> creating more problems than we are solving. It is still > >>>>>>>> unclear > >>>>>>>>> to > >>>>>>>>>>> me > >>>>>>>>>>>>> how > >>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could include > >> an > >>>>>>>>>> example? > >>>>>>>>>>>> It > >>>>>>>>>>>>>>> seems like there is a specific use case in mind and maybe > >> we > >>>>>>>> can > >>>>>>>>>>> make > >>>>>>>>>>>> a > >>>>>>>>>>>>>>> design that is tighter and supports that case. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Justine > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Apr 23, 2024 at 3:06 PM Kirk True < > >>>>>>> k...@kirktrue.pro> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> A few questions: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> K1. What is the expected behavior for the producer if it > >>>>>>>>>> generates > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> RecordTooLargeException, but the handler returns RETRY? > >>>>>>>>>>>>>>>> K2. How do we determine which Record was responsible for > >>>>>>> the > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException since we get that > >> response > >>>>>>>>> when > >>>>>>>>>>>>>>> sending a > >>>>>>>>>>>>>>>> batch of records? > >>>>>>>>>>>>>>>> K3. What is the expected behavior if the handle() method > >>>>>>>> itself > >>>>>>>>>>>> throws > >>>>>>>>>>>>> an > >>>>>>>>>>>>>>>> error? > >>>>>>>>>>>>>>>> K4. What is the downside of adding an onError() method to > >>>>>>> the > >>>>>>>>>>>>> Producer’s > >>>>>>>>>>>>>>>> Callback interface vs. a new mechanism? > >>>>>>>>>>>>>>>> K5. Can we change “ProducerExceptionHandlerResponse" to > >>>>>>> just > >>>>>>>>>>>> “Response” > >>>>>>>>>>>>>>>> given that it’s an inner enum? > >>>>>>>>>>>>>>>> K6. Any recommendation for callback authors to handle > >>>>>>>> different > >>>>>>>>>>>>> behavior > >>>>>>>>>>>>>>>> for different topics? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I’ll echo what others have said, it would help me > >>>>>>> understand > >>>>>>>>> why > >>>>>>>>>> we > >>>>>>>>>>>>> want > >>>>>>>>>>>>>>>> another handler class if there were more examples in the > >>>>>>>>>> Motivation > >>>>>>>>>>>>>>>> section. As it stands now, I agree with Chris that the > >>>>>>> stated > >>>>>>>>>>> issues > >>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>> be solved by adding two new configuration options: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> oversized.record.behavior=fail > >>>>>>>>>>>>>>>> retry.on.unknown.topic.or.partition=true > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What I’m not yet able to wrap my head around is: what > >>>>>>> exactly > >>>>>>>>>> would > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> logic in the handler be? I’m not very imaginative, so I’m > >>>>>>>>>> assuming > >>>>>>>>>>>>> they’d > >>>>>>>>>>>>>>>> mostly be if-this-then-that. However, if they’re more > >>>>>>>>>> complicated, > >>>>>>>>>>>> I’d > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> other concerns. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Kirk > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi > >>>>>>>>>>>>> <asae...@confluent.io.INVALID > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thank you all for the feedback! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Addressing the main concern: The KIP is about giving the > >>>>>>>> user > >>>>>>>>>> the > >>>>>>>>>>>>>>> ability > >>>>>>>>>>>>>>>>> to handle producer exceptions, but to be more > >> conservative > >>>>>>>> and > >>>>>>>>>>> avoid > >>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>> issues, we decided to be limited to a short list of > >>>>>>>>> exceptions. > >>>>>>>>>> I > >>>>>>>>>>>>>>>> included > >>>>>>>>>>>>>>>>> *RecordTooLargeExceptin* and > >>>>>>>>> *UnknownTopicOrPartitionException. > >>>>>>>>>>>> *Open > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> suggestion for adding some more ;-) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> KIP Updates: > >>>>>>>>>>>>>>>>> - clarified the way that the user should configure the > >>>>>>>>> Producer > >>>>>>>>>> to > >>>>>>>>>>>> use > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> custom handler. I think adding a producer config > >> property > >>>>>>> is > >>>>>>>>> the > >>>>>>>>>>>>>>> cleanest > >>>>>>>>>>>>>>>>> one. > >>>>>>>>>>>>>>>>> - changed the *ClientExceptionHandler* to > >>>>>>>>>>> *ProducerExceptionHandler* > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> closer to what we are changing. > >>>>>>>>>>>>>>>>> - added the ProducerRecord as the input parameter of the > >>>>>>>>>> handle() > >>>>>>>>>>>>>>> method > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> well. > >>>>>>>>>>>>>>>>> - increased the response types to 3 to have fail and two > >>>>>>>> types > >>>>>>>>>> of > >>>>>>>>>>>>>>>> continue. > >>>>>>>>>>>>>>>>> - The default behaviour is having no custom handler, > >>>>>>> having > >>>>>>>>> the > >>>>>>>>>>>>>>>>> corresponding config parameter set to null. Therefore, > >> the > >>>>>>>> KIP > >>>>>>>>>>>>> provides > >>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>> default implementation of the interface. > >>>>>>>>>>>>>>>>> - We follow the interface solution as described in the > >>>>>>>>>>>>>>>>> Rejected Alternetives section. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax < > >>>>>>>>>> mj...@apache.org > >>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh! It addresses an important > >> case > >>>>>>>> for > >>>>>>>>>>> error > >>>>>>>>>>>>>>>>>> handling. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I agree that using this handler would be an expert API, > >>>>>>> as > >>>>>>>>>>>> mentioned > >>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> a few people. But I don't think it would be a reason to > >>>>>>> not > >>>>>>>>> add > >>>>>>>>>>> it. > >>>>>>>>>>>>>>> It's > >>>>>>>>>>>>>>>>>> always a tricky tradeoff what to expose to users and to > >>>>>>>> avoid > >>>>>>>>>>> foot > >>>>>>>>>>>>>>> guns, > >>>>>>>>>>>>>>>>>> but we added similar handlers to Kafka Streams, and > >> have > >>>>>>>> good > >>>>>>>>>>>>>>> experience > >>>>>>>>>>>>>>>>>> with it. Hence, I understand, but don't share the > >> concern > >>>>>>>>>> raised. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I also agree that there is some responsibility by the > >>>>>>> user > >>>>>>>> to > >>>>>>>>>>>>>>> understand > >>>>>>>>>>>>>>>>>> how such a handler should be implemented to not drop > >> data > >>>>>>>> by > >>>>>>>>>>>>> accident. > >>>>>>>>>>>>>>>>>> But it seem unavoidable and acceptable. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> While I understand that a "simpler / reduced" API (eg > >> via > >>>>>>>>>>> configs) > >>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>> also work, I personally prefer a full handler. Configs > >>>>>>> have > >>>>>>>>> the > >>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>> issue that they could be miss-used potentially leading > >> to > >>>>>>>>>>>> incorrectly > >>>>>>>>>>>>>>>>>> dropped data, but at the same time are less flexible > >> (and > >>>>>>>>> thus > >>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>> ever harder to use correctly...?). Base on my > >> experience, > >>>>>>>>> there > >>>>>>>>>>> is > >>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>> often weird corner case for which it make sense to also > >>>>>>>> drop > >>>>>>>>>>>> records > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> other exceptions, and a full handler has the advantage > >> of > >>>>>>>>> full > >>>>>>>>>>>>>>>>>> flexibility and "absolute power!". > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> To be fair: I don't know the exact code paths of the > >>>>>>>> producer > >>>>>>>>>> in > >>>>>>>>>>>>>>>>>> details, so please keep me honest. But my understanding > >>>>>>> is, > >>>>>>>>>> that > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>> aims to allow users to react to internal exception, and > >>>>>>>>> decide > >>>>>>>>>> to > >>>>>>>>>>>>> keep > >>>>>>>>>>>>>>>>>> retrying internally, swallow the error and drop the > >>>>>>> record, > >>>>>>>>> or > >>>>>>>>>>>> raise > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> error? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Maybe the KIP would need to be a little bit more > >> precises > >>>>>>>>> what > >>>>>>>>>>>> error > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> want to cover -- I don't think this list must be > >>>>>>>> exhaustive, > >>>>>>>>> as > >>>>>>>>>>> we > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> always do follow up KIP to also apply the handler to > >>>>>>> other > >>>>>>>>>> errors > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> expand the scope of the handler. The KIP does mention > >>>>>>>>> examples, > >>>>>>>>>>> but > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> might be good to explicitly state for what cases the > >>>>>>>> handler > >>>>>>>>>> gets > >>>>>>>>>>>>>>>> applied? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I am also not sure if CONTINUE and FAIL are enough > >>>>>>> options? > >>>>>>>>>> Don't > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> need three options? Or would `CONTINUE` have different > >>>>>>>>> meaning > >>>>>>>>>>>>>>> depending > >>>>>>>>>>>>>>>>>> on the type of error? Ie, for a retryable error > >>>>>>> `CONTINUE` > >>>>>>>>>> would > >>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>> keep retrying internally, but for a non-retryable error > >>>>>>>>>>> `CONTINUE` > >>>>>>>>>>>>>>> means > >>>>>>>>>>>>>>>>>> swallow the error and drop the record? This semantic > >>>>>>>> overload > >>>>>>>>>>> seems > >>>>>>>>>>>>>>>>>> tricky to reason about by users, so it might better to > >>>>>>>> split > >>>>>>>>>>>>>>> `CONTINUE` > >>>>>>>>>>>>>>>>>> into two cases -> `RETRY` and `SWALLOW` (or some better > >>>>>>>>> names). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Additionally, should we just ship a > >>>>>>>>>>> `DefaultClientExceptionHandler` > >>>>>>>>>>>>>>>>>> which would return `FAIL`, for backward compatibility. > >> Or > >>>>>>>>> don't > >>>>>>>>>>>> have > >>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>> default handler to begin with and allow it to be > >> `null`? > >>>>>>> I > >>>>>>>>>> don't > >>>>>>>>>>>> see > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> need for a specific `TransactionExceptionHandler`. To > >> me, > >>>>>>>> the > >>>>>>>>>>> goal > >>>>>>>>>>>>>>>>>> should be to not modify the default behavior at all, > >> but > >>>>>>> to > >>>>>>>>>> just > >>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>> users to change the default behavior if there is a > >> need. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> What is missing on the KIP though it, how the handler > >> is > >>>>>>>>> passed > >>>>>>>>>>>> into > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> producer thought? Would we need a new config which > >> allows > >>>>>>>> to > >>>>>>>>>> set > >>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> custom handler? And/or would we allow to pass in an > >>>>>>>> instance > >>>>>>>>>> via > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> constructor or add a new method to set a handler? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote: > >>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Exception handling in the Kafka producer and consumer > >> is > >>>>>>>>>> really > >>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>> ideal. > >>>>>>>>>>>>>>>>>>> It’s even harder working out what’s going on with the > >>>>>>>>>> consumer. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I’m a bit nervous about this KIP and I agree with > >> Chris > >>>>>>>> that > >>>>>>>>>> it > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>> with additional > >>>>>>>>>>>>>>>>>>> motivation. This would be an expert-level interface > >>>>>>> given > >>>>>>>>> how > >>>>>>>>>>>>>>>> complicated > >>>>>>>>>>>>>>>>>>> the exception handling for Kafka has become. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 7. The application is not really aware of the batching > >>>>>>>> being > >>>>>>>>>>> done > >>>>>>>>>>>> on > >>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>> behalf. > >>>>>>>>>>>>>>>>>>> The ProduceResponse can actually return an array of > >>>>>>>> records > >>>>>>>>>>> which > >>>>>>>>>>>>>>>> failed > >>>>>>>>>>>>>>>>>>> per batch. If you get RecordTooLargeException, and > >> want > >>>>>>> to > >>>>>>>>>>> retry, > >>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> probably > >>>>>>>>>>>>>>>>>>> need to remove the offending records from the batch > >> and > >>>>>>>>> retry > >>>>>>>>>>> it. > >>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>> is getting fiddly. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 8. There is already o.a.k.clients.producer.Callback. I > >>>>>>>>> wonder > >>>>>>>>>>>>> whether > >>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>> alternative might be to add a method to the existing > >>>>>>>>> Callback > >>>>>>>>>>>>>>>> interface, > >>>>>>>>>>>>>>>>>> such as: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> ClientExceptionResponse onException(Exception > >>>>>>> exception) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It would be called when a ProduceResponse contains an > >>>>>>>> error, > >>>>>>>>>> but > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> producer is going to retry. It tells the producer > >>>>>>> whether > >>>>>>>> to > >>>>>>>>>> go > >>>>>>>>>>>>> ahead > >>>>>>>>>>>>>>>>>> with the retry > >>>>>>>>>>>>>>>>>>> or not. The default implementation would be to > >> CONTINUE, > >>>>>>>>>> because > >>>>>>>>>>>>>>> that’s > >>>>>>>>>>>>>>>>>>> just continuing to retry as planned. Note that this > >> is a > >>>>>>>>>>>> per-record > >>>>>>>>>>>>>>>>>> callback, so > >>>>>>>>>>>>>>>>>>> the application would be able to understand which > >>>>>>> records > >>>>>>>>>>> failed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> By using an existing interface, we already know how to > >>>>>>>>>> configure > >>>>>>>>>>>> it > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> we know > >>>>>>>>>>>>>>>>>>> about the threading model for calling it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton > >>>>>>>>>>> <chr...@aiven.io.INVALID > >>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! The issue with writing to > >>>>>>>> non-existent > >>>>>>>>>>> topics > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> particularly frustrating for users of Kafka Connect > >> and > >>>>>>>> has > >>>>>>>>>>> been > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>> of a handful of Jira tickets over the years. My > >>>>>>> thoughts: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. An additional detail we can add to the motivation > >>>>>>> (or > >>>>>>>>>>> possibly > >>>>>>>>>>>>>>>>>> rejected > >>>>>>>>>>>>>>>>>>>> alternatives) section is that this kind of custom > >> retry > >>>>>>>>> logic > >>>>>>>>>>>> can't > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> implemented by hand by, e.g., setting retries to 0 in > >>>>>>> the > >>>>>>>>>>>> producer > >>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>>> and handling exceptions at the application level. Or > >>>>>>>>> rather, > >>>>>>>>>> it > >>>>>>>>>>>>> can, > >>>>>>>>>>>>>>>>>> but 1) > >>>>>>>>>>>>>>>>>>>> it's a bit painful to have to reimplement at every > >>>>>>>>> call-site > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> Producer::send (and any code that awaits the returned > >>>>>>>>> Future) > >>>>>>>>>>> and > >>>>>>>>>>>>> 2) > >>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>> impossible to do this without losing idempotency on > >>>>>>>>> retries. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2. That said, I wonder if a pluggable interface is > >>>>>>> really > >>>>>>>>> the > >>>>>>>>>>>> right > >>>>>>>>>>>>>>>> call > >>>>>>>>>>>>>>>>>>>> here. Documenting the interactions of a producer with > >>>>>>>>>>>>>>>>>>>> a ClientExceptionHandler instance will be tricky, and > >>>>>>>>>>>> implementing > >>>>>>>>>>>>>>>> them > >>>>>>>>>>>>>>>>>>>> will also be a fair amount of work. I believe that > >>>>>>> there > >>>>>>>>>> needs > >>>>>>>>>>> to > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>> more granularity for how writes to non-existent > >> topics > >>>>>>>> (or > >>>>>>>>>>>> really, > >>>>>>>>>>>>>>>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from > >> the > >>>>>>>>>> broker) > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>> handled, > >>>>>>>>>>>>>>>>>>>> but I'm torn between keeping it simple with maybe one > >>>>>>> or > >>>>>>>>> two > >>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> producer > >>>>>>>>>>>>>>>>>>>> config properties, or a full-blown pluggable > >> interface. > >>>>>>>> If > >>>>>>>>>>> there > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>> cases that would benefit from a pluggable interface, > >> it > >>>>>>>>> would > >>>>>>>>>>> be > >>>>>>>>>>>>>>> nice > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> identify these and add them to the KIP to strengthen > >>>>>>> the > >>>>>>>>>>>>> motivation. > >>>>>>>>>>>>>>>>>> Right > >>>>>>>>>>>>>>>>>>>> now, I'm not sure the two that are mentioned in the > >>>>>>>>>> motivation > >>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> sufficient. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 3. Alternatively, a possible compromise is for this > >> KIP > >>>>>>>> to > >>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>> properties that dictate how to handle > >>>>>>>>> unknown-topic-partition > >>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> record-too-large errors, with the thinking that if we > >>>>>>>>>>> introduce a > >>>>>>>>>>>>>>>>>> pluggable > >>>>>>>>>>>>>>>>>>>> interface later on, these properties will be > >> recognized > >>>>>>>> by > >>>>>>>>>> the > >>>>>>>>>>>>>>> default > >>>>>>>>>>>>>>>>>>>> implementation of that interface but could be > >>>>>>> completely > >>>>>>>>>>> ignored > >>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>> replaced by alternative implementations. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 4. (Nit) You can remove the "This page is meant as a > >>>>>>>>> template > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> writing a > >>>>>>>>>>>>>>>>>>>> KIP..." part from the KIP. It's not a template > >> anymore > >>>>>>> :) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 5. If we do go the pluggable interface route, > >> wouldn't > >>>>>>> we > >>>>>>>>>> want > >>>>>>>>>>> to > >>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> possibility for retry logic? The simplest version of > >>>>>>> this > >>>>>>>>>> could > >>>>>>>>>>>> be > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>> RETRY value to the ClientExceptionHandlerResponse > >> enum. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead > >> of > >>>>>>>>>>>> "CONTINUE" > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> the ClientExceptionHandlerResponse enum, since they > >>>>>>> cause > >>>>>>>>>>> records > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> dropped. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hey Alieh, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I echo what Omnia says, I'm not sure I understand > >> the > >>>>>>>>>>>> implications > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> change and I think more detail is needed. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> This comment also confused me a bit: > >>>>>>>>>>>>>>>>>>>>> * {@code ClientExceptionHandler} that continues the > >>>>>>>>>>> transaction > >>>>>>>>>>>>>>> even > >>>>>>>>>>>>>>>>>> if a > >>>>>>>>>>>>>>>>>>>>> record is too large. > >>>>>>>>>>>>>>>>>>>>> * Otherwise, it makes the transaction to fail. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Relatedly, I've been working with some folks on a > >> KIP > >>>>>>>> for > >>>>>>>>>>>>>>>> transactions > >>>>>>>>>>>>>>>>>>>>> errors and how they are handled. Specifically for > >> the > >>>>>>>>>>>>>>>>>>>>> RecordTooLargeException (and a few other errors), we > >>>>>>>> want > >>>>>>>>> to > >>>>>>>>>>>> give > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>> error category for this error that allows the > >>>>>>>> application > >>>>>>>>> to > >>>>>>>>>>>>> choose > >>>>>>>>>>>>>>>>>> how it > >>>>>>>>>>>>>>>>>>>>> is handled. Maybe this KIP is something that you are > >>>>>>>>> looking > >>>>>>>>>>>> for? > >>>>>>>>>>>>>>>> Stay > >>>>>>>>>>>>>>>>>>>>> tuned :) > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Justine > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < > >>>>>>>>>>>>>>>> o.g.h.ibra...@gmail.com > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I have couple of comments > >>>>>>>>>>>>>>>>>>>>>> - You mentioned in the KIP motivation, > >>>>>>>>>>>>>>>>>>>>>>> Another example for which a production exception > >>>>>>>> handler > >>>>>>>>>>> could > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>>>>> is if a user tries to write into a non-existing > >>>>>>> topic, > >>>>>>>>>> which > >>>>>>>>>>>>>>>> returns a > >>>>>>>>>>>>>>>>>>>>>> retryable error code; with infinite retries, the > >>>>>>>> producer > >>>>>>>>>>> would > >>>>>>>>>>>>>>> hang > >>>>>>>>>>>>>>>>>>>>>> retrying forever. A handler could help to break the > >>>>>>>>>> infinite > >>>>>>>>>>>>> retry > >>>>>>>>>>>>>>>>>> loop. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> How the handler can differentiate between something > >>>>>>>> that > >>>>>>>>> is > >>>>>>>>>>>>>>>> temporary > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> it should keep retrying and something permanent > >> like > >>>>>>>>> forgot > >>>>>>>>>>> to > >>>>>>>>>>>>>>>> create > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> topic? temporary here could be > >>>>>>>>>>>>>>>>>>>>>> the producer get deployed before the topic creation > >>>>>>>>> finish > >>>>>>>>>>>>>>>> (specially > >>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>> the topic creation is handled via IaC) > >>>>>>>>>>>>>>>>>>>>>> temporary offline partitions > >>>>>>>>>>>>>>>>>>>>>> leadership changing > >>>>>>>>>>>>>>>>>>>>>> Isn’t this putting the producer at risk of > >>>>>>>> dropping > >>>>>>>>>>>> records > >>>>>>>>>>>>>>>>>>>>>> unintentionally? > >>>>>>>>>>>>>>>>>>>>>> - Can you elaborate more on what is written in the > >>>>>>>>>>>> compatibility > >>>>>>>>>>>>> / > >>>>>>>>>>>>>>>>>>>>>> migration plan section please by explaining in bit > >>>>>>> more > >>>>>>>>>>> details > >>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> changing behaviour and how this will impact client > >>>>>>> who > >>>>>>>>> are > >>>>>>>>>>>>>>>> upgrading? > >>>>>>>>>>>>>>>>>>>>>> - In the proposal changes can you elaborate in the > >>>>>>> KIP > >>>>>>>>>> where > >>>>>>>>>>> in > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> producer lifecycle will ClientExceptionHandler and > >>>>>>>>>>>>>>>>>>>>>> TransactionExceptionHandler get triggered, and how > >>>>>>> will > >>>>>>>>> the > >>>>>>>>>>>>>>> producer > >>>>>>>>>>>>>>>>>>>>>> configure them to point to costumed implementation. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>>> Omnia > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi > >>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to > >>>>>>>>>> Producer. > >>>>>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I look forward to your feedback! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>>>>> Alieh > > >