Alieh and Chris, Thanks for clarifying 1) but I saw the motivation. I guess I just didn't understand how that would be ensured on the producer side. I saw the sample code -- is it just an if statement checking for the error before the handler is invoked? That seems a bit fragile.
Can you clarify what you mean by `since the code does not reach the KS interface and breaks somewhere in producer.` If we surfaced this error to the application in a better way would that also be a solution to the issue? Justine On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi <asae...@confluent.io.invalid> wrote: > Hi, > > > Thank you, Chris and Justine, for the feedback. > > > @Chris > > 1) Flexibility: it has two meanings. The first meaning is the one you > mentioned. We are going to cover more exceptions in the future, but as > Justine mentioned, we must be very conservative about adding more > exceptions. Additionally, flexibility mainly means that the user is able to > develop their own code. As mentioned in the motivation section and the > examples, sometimes the user decides on dropping a record based on the > topic, for example. > > > 2) Defining two separate methods for retriable and non-retriable > exceptions: although the idea is brilliant, the user may still make a > mistake by implementing the wrong method and see a non-expecting behaviour. > For example, he may implement handleRetriable() for RecordTooLargeException > and define SWALLOW for the exception, but in practice, he sees no change in > default behaviour since he implemented the wrong method. I think we can > never reduce the user’s mistakes to 0. > > > > 3) Default implementation for Handler: the default behaviour is already > preserved with NO need of implementing any handler or setting the > corresponding config parameter `custom.exception.handler`. What you mean is > actually having a second default, which requires having both interface and > config parameters. About UnknownTopicOrPartitionException: the producer > already offers the config parameter `max.block.ms` which determines the > duration of retrying. The main purpose of the user who needs the handler is > to get the root cause of TimeoutException and handle it in the way he > intends. The KIP explains the necessity of it for KS users. > > > 4) Naming issue: By SWALLOW, we meant actually swallow the error, while > SKIP means skip the record, I think. If it makes sense for more ppl, I can > change it to SKIP > > > @Justine > > 1) was addressed by Chris. > > 2 and 3) The problem is exactly what you mentioned. Currently, there is no > way to handle these issues application-side. Even KS users who implement KS > ProductionExceptionHandler are not able to handle the exceptions as they > intend since the code does not reach the KS interface and breaks somewhere > in producer. > > Cheers, > Alieh > > On Tue, May 7, 2024 at 8:43 PM Chris Egerton <fearthecel...@gmail.com> > wrote: > > > Hi Justine, > > > > The method signatures for the interface are indeed open-ended, but the > KIP > > states that its uses will be limited. See the motivation section: > > > > > We believe that the user should be able to develop custom exception > > handlers for managing producer exceptions. On the other hand, this will > be > > an expert-level API, and using that may result in strange behaviour in > the > > system, making it hard to find the root cause. Therefore, the custom > > handler is currently limited to handling RecordTooLargeException and > > UnknownTopicOrPartitionException. > > > > Cheers, > > > > Chris > > > > > > On Tue, May 7, 2024, 14:37 Justine Olshan <jols...@confluent.io.invalid> > > wrote: > > > > > Hi Alieh, > > > > > > I was out for KSB and then was also sick. :( > > > > > > To your point 1) Chris, I don't think it is limited to two specific > > > scenarios, since the interface accepts a generic Exception e and can be > > > implemented to check if that e is an instanceof any exception. I didn't > > see > > > anywhere that specific errors are enforced. I'm a bit concerned about > > this > > > actually. I'm concerned about the opened-endedness and the contract we > > have > > > with transactions. We are allowing the client to make decisions that > are > > > somewhat invisible to the server. As an aside, can we build in log > > messages > > > when the handler decides to skip etc a message. I'm really concerned > > about > > > messages being silently dropped. > > > > > > I do think Chris's point 2) about retriable vs non retriable errors is > > > fair. I'm a bit concerned about skipping a unknown topic or partition > > > exception too early, as there are cases where it can be transient. > > > > > > I'm still a little bit wary of allowing dropping records as part of EOS > > > generally as in many cases, these errors signify an issue with the > > original > > > data. I understand that streams and connect/mirror maker may have > reasons > > > they want to progress past these messages, but wondering if there is a > > way > > > that can be done application-side. I'm willing to accept this sort of > > > proposal if we can make it clear that this sort of thing is happening > and > > > we limit the blast radius for what we can do. > > > > > > Justine > > > > > > On Tue, May 7, 2024 at 9:55 AM Chris Egerton <chr...@aiven.io.invalid> > > > wrote: > > > > > > > Hi Alieh, > > > > > > > > Sorry for the delay, I've been out sick. I still have some thoughts > > that > > > > I'd like to see addressed before voting. > > > > > > > > 1) If flexibility is the motivation for a pluggable interface, why > are > > we > > > > only limiting the uses for this interface to two very specific > > scenarios? > > > > Why not also allow, e.g., authorization errors to be handled as well > > > > (allowing users to drop records destined for some off-limits topics, > or > > > > retry for a limited duration in case there's a delay in the > propagation > > > of > > > > ACL updates)? It'd be nice to see some analysis of other errors that > > > could > > > > be handled with this new API, both to avoid the follow-up work of > > another > > > > KIP to address them in the future, and to make sure that we're not > > > painting > > > > ourselves into a corner with the current API in a way that would make > > > > future modifications difficult. > > > > > > > > 2) Something feels a bit off with how retriable vs. non-retriable > > errors > > > > are handled with the interface. Why not introduce two separate > methods > > to > > > > handle each case separately? That way there's no ambiguity or > implicit > > > > behavior when, e.g., attempting to retry on a > RecordTooLargeException. > > > This > > > > could be something like `NonRetriableResponse handle(ProducerRecord, > > > > Exception)` and `RetriableResponse handleRetriable(ProducerRecord, > > > > Exception)`, though the exact names and shape can obviously be toyed > > > with a > > > > bit. > > > > > > > > 3) Although the flexibility of a pluggable interface may benefit some > > > > users' custom producer applications and Kafka Streams applications, > it > > > > comes at significant deployment cost for other low-/no-code > > environments, > > > > including but not limited to Kafka Connect and MirrorMaker 2. Can we > > add > > > a > > > > default implementation of the exception handler that allows for some > > > simple > > > > behavior to be tweaked via configuration property? Two things that > > would > > > be > > > > nice to have would be A) an upper bound on the retry time for > > > > unknown-topic-partition exceptions and B) an option to drop records > > that > > > > are large enough to trigger a record-too-large exception. > > > > > > > > 4) I'd still prefer to see "SKIP" or "DROP" instead of the proposed > > > > "SWALLOW" option, which IMO is opaque and non-obvious, especially > when > > > > trying to guess the behavior for retriable errors. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > > > <asae...@confluent.io.invalid > > > > > > > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > > > > > > A summary of the KIP and the discussions: > > > > > > > > > > > > > > > The KIP introduces a handler interface for Producer in order to > > handle > > > > two > > > > > exceptions: RecordTooLargeException and > > > UnknownTopicOrPartitionException. > > > > > The handler handles the exceptions per-record. > > > > > > > > > > > > > > > - Do we need this handler? [Motivation and Examples sections] > > > > > > > > > > > > > > > RecordTooLargeException: 1) In transactions, the producer collects > > > > multiple > > > > > records in batches. Then a RecordTooLargeException related to a > > single > > > > > record leads to failing the entire batch. A custom exception > handler > > in > > > > > this case may decide on dropping the record and continuing the > > > > processing. > > > > > See Example 1, please. 2) More over, in Kafka Streams, a record > that > > is > > > > too > > > > > large is a poison pill record, and there is no way to skip over > it. A > > > > > handler would allow us to react to this error inside the producer, > > > i.e., > > > > > local to where the error happens, and thus simplify the overall > code > > > > > significantly. Please read the Motivation section for more > > explanation. > > > > > > > > > > > > > > > UnknownTopicOrPartitionException: For this case, the producer > handles > > > > this > > > > > exception internally and only issues a WARN log about missing > > metadata > > > > and > > > > > retries internally. Later, when the producer hits " > > deliver.timeout.ms" > > > > it > > > > > throws a TimeoutException, and the user can only blindly retry, > which > > > may > > > > > result in an infinite retry loop. The thrown TimeoutException > "cuts" > > > the > > > > > connection to the underlying root cause of missing metadata (which > > > could > > > > > indeed be a transient error but is persistent for a non-existing > > > topic). > > > > > Thus, there is no programmatic way to break the infinite retry > loop. > > > > Kafka > > > > > Streams also blindly retries for this case, and the application > gets > > > > stuck. > > > > > > > > > > > > > > > > > > > > - Having interface vs configuration option: [Motivation, Examples, > > and > > > > > Rejected Alternatives sections] > > > > > > > > > > Our solution is introducing an interface due to the full > flexibility > > > that > > > > > it offers. Sometimes users, especially Kafka Streams ones, > determine > > > the > > > > > handler's behaviour based on the situation. For example, f > > > > > acing UnknownTopicOrPartitionException*, *the user may want to > raise > > an > > > > > error for some topics but retry it for other topics. Having a > > > > configuration > > > > > option with a fixed set of possibilities does not serve the user's > > > > > needs. See Example 2, please. > > > > > > > > > > > > > > > - Note on RecordTooLargeException: [Public Interfaces section] > > > > > > > > > > If the custom handler decides on SWALLOW for > RecordTooLargeException, > > > > then > > > > > this record will not be a part of the batch of transactions and > will > > > also > > > > > not be sent to the broker in non-transactional mode. So no worries > > > about > > > > > getting a RecordTooLargeException from the broker in this case, as > > the > > > > > record will never ever be sent to the broker. SWALLOW means drop > the > > > > record > > > > > and continue/swallow the error. > > > > > > > > > > > > > > > - What if the handle() method implements RETRY for > > > > RecordTooLargeException? > > > > > [Proposed Changes section] > > > > > > > > > > We have to limit the user to only have FAIL or SWALLOW for > > > > > RecordTooLargeException. Actually, RETRY must be equal to FAIL. > This > > is > > > > > well documented/informed in javadoc. > > > > > > > > > > > > > > > > > > > > - What if the handle() method of the handler throws an exception? > > > > > > > > > > The handler is expected to have correct code. If it throws an > > > exception, > > > > > everything fails. > > > > > > > > > > > > > > > > > > > > This is a PoC PR <https://github.com/apache/kafka/pull/15846> ONLY > > for > > > > > RecordTooLargeException. The code changes related to > > > > > UnknownTopicOrPartitionException will be added to this PR LATER. > > > > > > > > > > > > > > > Looking forward to your feedback again. > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Alieh > > > > > > > > > > On Thu, Apr 25, 2024 at 11:46 PM Kirk True <k...@kirktrue.pro> > > wrote: > > > > > > > > > > > Hi Alieh, > > > > > > > > > > > > Thanks for the updates! > > > > > > > > > > > > Comments inline... > > > > > > > > > > > > > > > > > > > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > > > > <asae...@confluent.io.INVALID > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > Thanks a lot for the constructive feedbacks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > Addressing some of the main concerns: > > > > > > > > > > > > > > > > > > > > > - The `RecordTooLargeException` can be thrown by broker, > producer > > > and > > > > > > > consumer. Of course, the `ProducerExceptionHandler` interface > is > > > > > > introduced > > > > > > > to affect only the exceptions thrown from the producer. This > KIP > > > very > > > > > > > specifically means to provide a possibility to manage the > > > > > > > `RecordTooLargeException` thrown from the Producer.send() > method. > > > > > Please > > > > > > > see “Proposed Changes” section for more clarity. I investigated > > the > > > > > issue > > > > > > > there thoroughly. I hope it can explain the concern about how > we > > > > handle > > > > > > the > > > > > > > errors as well. > > > > > > > > > > > > > > > > > > > > > > > > > > > > - The problem with Callback: Methods of Callback are called > when > > > the > > > > > > record > > > > > > > sent to the server is acknowledged, while this is not the > desired > > > > time > > > > > > for > > > > > > > all exceptions. We intend to handle exceptions beforehand. > > > > > > > > > > > > I guess it makes sense to keep the expectation for when Callback > is > > > > > > invoked as-is vs. shoehorning more into it. > > > > > > > > > > > > > - What if the custom handler returns RETRY for > > > > > > `RecordTooLargeException`? I > > > > > > > assume changing the producer configuration at runtime is > > possible. > > > If > > > > > so, > > > > > > > RETRY for a too large record is valid because maybe in the next > > > try, > > > > > the > > > > > > > too large record is not poisoning any more. I am not 100% sure > > > about > > > > > the > > > > > > > technical details, though. Otherwise, we can consider the RETRY > > as > > > > FAIL > > > > > > for > > > > > > > this exception. Another solution would be to consider a > constant > > > > number > > > > > > of > > > > > > > times for RETRY which can be useful for other exceptions as > well. > > > > > > > > > > > > It’s not presently possible to change the configuration of an > > > existing > > > > > > Producer at runtime. So if a record hits a > RecordTooLargeException > > > > once, > > > > > no > > > > > > amount of retrying (with the current Producer) will change that > > fact. > > > > So > > > > > > I’m still a little stuck on how to handle a response of RETRY for > > an > > > > > > “oversized” record. > > > > > > > > > > > > > - What if the handle() method itself throws an exception? I > think > > > > > > > rationally and pragmatically, the behaviour must be exactly > like > > > when > > > > > no > > > > > > > custom handler is defined since the user actually did not have > a > > > > > working > > > > > > > handler. > > > > > > > > > > > > I’m not convinced that ignoring an errant handler is the right > > > choice. > > > > It > > > > > > then becomes a silent failure that might have repercussions, > > > depending > > > > on > > > > > > the business logic. A user would have to proactively trawls > through > > > the > > > > > > logs for WARN/ERROR messages to catch it. > > > > > > > > > > > > Throwing a hard error is pretty draconian, though… > > > > > > > > > > > > > - Why not use config parameters instead of an interface? As > > > explained > > > > > in > > > > > > > the “Rejected Alternatives” section, we assume that the handler > > > will > > > > be > > > > > > > used for a greater number of exceptions in the future. > Defining a > > > > > > > configuration parameter for each exception may make the > > > > configuration a > > > > > > bit > > > > > > > messy. Moreover, the handler offers more flexibility. > > > > > > > > > > > > Agreed that the logic-via-configuration approach is weird and > > > limiting. > > > > > > Forget I ever suggested it ;) > > > > > > > > > > > > I’d think additional background in the Motivation section would > > help > > > me > > > > > > understand how users might use this feature beyond a) skipping > > > > > “oversized” > > > > > > records, and b) not retrying missing topics. > > > > > > > > > > > > > Small change: > > > > > > > > > > > > > > -ProductionExceptionHandlerResponse -> Response for brevity and > > > > > > simplicity. > > > > > > > Could’ve been HandlerResponse too I think! > > > > > > > > > > > > The name change sounds good to me. > > > > > > > > > > > > Thanks Alieh! > > > > > > > > > > > > > > > > > > > > > > > > > > > I thank you all again for your useful questions/suggestions. > > > > > > > > > > > > > > I would be happy to hear more of your concerns, as stated in > some > > > > > > feedback. > > > > > > > > > > > > > > Cheers, > > > > > > > Alieh > > > > > > > > > > > > > > On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > > > > > > > <jols...@confluent.io.invalid> wrote: > > > > > > > > > > > > > >> Thanks Alieh for the updates. > > > > > > >> > > > > > > >> I'm a little concerned about the design pattern here. It seems > > > like > > > > we > > > > > > want > > > > > > >> specific usages, but we are packaging it as a generic handler. > > > > > > >> I think we tried to narrow down on the specific errors we want > > to > > > > > > handle, > > > > > > >> but it feels a little clunky as we have a generic thing for > two > > > > > specific > > > > > > >> errors. > > > > > > >> > > > > > > >> I'm wondering if we are using the right patterns to solve > these > > > > > > problems. I > > > > > > >> agree though that we will need something more than the error > > > classes > > > > > I'm > > > > > > >> proposing if we want to have different handling be > configurable. > > > > > > >> My concern is that the open-endedness of a handler means that > we > > > are > > > > > > >> creating more problems than we are solving. It is still > unclear > > to > > > > me > > > > > > how > > > > > > >> we expect to handle the errors. Perhaps we could include an > > > example? > > > > > It > > > > > > >> seems like there is a specific use case in mind and maybe we > can > > > > make > > > > > a > > > > > > >> design that is tighter and supports that case. > > > > > > >> > > > > > > >> Justine > > > > > > >> > > > > > > >> On Tue, Apr 23, 2024 at 3:06 PM Kirk True <k...@kirktrue.pro> > > > > wrote: > > > > > > >> > > > > > > >>> Hi Alieh, > > > > > > >>> > > > > > > >>> Thanks for the KIP! > > > > > > >>> > > > > > > >>> A few questions: > > > > > > >>> > > > > > > >>> K1. What is the expected behavior for the producer if it > > > generates > > > > a > > > > > > >>> RecordTooLargeException, but the handler returns RETRY? > > > > > > >>> K2. How do we determine which Record was responsible for the > > > > > > >>> UnknownTopicOrPartitionException since we get that response > > when > > > > > > >> sending a > > > > > > >>> batch of records? > > > > > > >>> K3. What is the expected behavior if the handle() method > itself > > > > > throws > > > > > > an > > > > > > >>> error? > > > > > > >>> K4. What is the downside of adding an onError() method to the > > > > > > Producer’s > > > > > > >>> Callback interface vs. a new mechanism? > > > > > > >>> K5. Can we change “ProducerExceptionHandlerResponse" to just > > > > > “Response” > > > > > > >>> given that it’s an inner enum? > > > > > > >>> K6. Any recommendation for callback authors to handle > different > > > > > > behavior > > > > > > >>> for different topics? > > > > > > >>> > > > > > > >>> I’ll echo what others have said, it would help me understand > > why > > > we > > > > > > want > > > > > > >>> another handler class if there were more examples in the > > > Motivation > > > > > > >>> section. As it stands now, I agree with Chris that the stated > > > > issues > > > > > > >> could > > > > > > >>> be solved by adding two new configuration options: > > > > > > >>> > > > > > > >>> oversized.record.behavior=fail > > > > > > >>> retry.on.unknown.topic.or.partition=true > > > > > > >>> > > > > > > >>> What I’m not yet able to wrap my head around is: what exactly > > > would > > > > > the > > > > > > >>> logic in the handler be? I’m not very imaginative, so I’m > > > assuming > > > > > > they’d > > > > > > >>> mostly be if-this-then-that. However, if they’re more > > > complicated, > > > > > I’d > > > > > > >> have > > > > > > >>> other concerns. > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> Kirk > > > > > > >>> > > > > > > >>>> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi > > > > > > <asae...@confluent.io.INVALID > > > > > > >>> > > > > > > >>> wrote: > > > > > > >>>> > > > > > > >>>> Thank you all for the feedback! > > > > > > >>>> > > > > > > >>>> Addressing the main concern: The KIP is about giving the > user > > > the > > > > > > >> ability > > > > > > >>>> to handle producer exceptions, but to be more conservative > and > > > > avoid > > > > > > >>> future > > > > > > >>>> issues, we decided to be limited to a short list of > > exceptions. > > > I > > > > > > >>> included > > > > > > >>>> *RecordTooLargeExceptin* and > > *UnknownTopicOrPartitionException. > > > > > *Open > > > > > > >> to > > > > > > >>>> suggestion for adding some more ;-) > > > > > > >>>> > > > > > > >>>> KIP Updates: > > > > > > >>>> - clarified the way that the user should configure the > > Producer > > > to > > > > > use > > > > > > >>> the > > > > > > >>>> custom handler. I think adding a producer config property is > > the > > > > > > >> cleanest > > > > > > >>>> one. > > > > > > >>>> - changed the *ClientExceptionHandler* to > > > > *ProducerExceptionHandler* > > > > > > to > > > > > > >>> be > > > > > > >>>> closer to what we are changing. > > > > > > >>>> - added the ProducerRecord as the input parameter of the > > > handle() > > > > > > >> method > > > > > > >>> as > > > > > > >>>> well. > > > > > > >>>> - increased the response types to 3 to have fail and two > types > > > of > > > > > > >>> continue. > > > > > > >>>> - The default behaviour is having no custom handler, having > > the > > > > > > >>>> corresponding config parameter set to null. Therefore, the > KIP > > > > > > provides > > > > > > >>> no > > > > > > >>>> default implementation of the interface. > > > > > > >>>> - We follow the interface solution as described in the > > > > > > >>>> Rejected Alternetives section. > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> Cheers, > > > > > > >>>> Alieh > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax < > > > mj...@apache.org > > > > > > > > > > > >>> wrote: > > > > > > >>>> > > > > > > >>>>> Thanks for the KIP Alieh! It addresses an important case > for > > > > error > > > > > > >>>>> handling. > > > > > > >>>>> > > > > > > >>>>> I agree that using this handler would be an expert API, as > > > > > mentioned > > > > > > >> by > > > > > > >>>>> a few people. But I don't think it would be a reason to not > > add > > > > it. > > > > > > >> It's > > > > > > >>>>> always a tricky tradeoff what to expose to users and to > avoid > > > > foot > > > > > > >> guns, > > > > > > >>>>> but we added similar handlers to Kafka Streams, and have > good > > > > > > >> experience > > > > > > >>>>> with it. Hence, I understand, but don't share the concern > > > raised. > > > > > > >>>>> > > > > > > >>>>> I also agree that there is some responsibility by the user > to > > > > > > >> understand > > > > > > >>>>> how such a handler should be implemented to not drop data > by > > > > > > accident. > > > > > > >>>>> But it seem unavoidable and acceptable. > > > > > > >>>>> > > > > > > >>>>> While I understand that a "simpler / reduced" API (eg via > > > > configs) > > > > > > >> might > > > > > > >>>>> also work, I personally prefer a full handler. Configs have > > the > > > > > same > > > > > > >>>>> issue that they could be miss-used potentially leading to > > > > > incorrectly > > > > > > >>>>> dropped data, but at the same time are less flexible (and > > thus > > > > > maybe > > > > > > >>>>> ever harder to use correctly...?). Base on my experience, > > there > > > > is > > > > > > >> also > > > > > > >>>>> often weird corner case for which it make sense to also > drop > > > > > records > > > > > > >> for > > > > > > >>>>> other exceptions, and a full handler has the advantage of > > full > > > > > > >>>>> flexibility and "absolute power!". > > > > > > >>>>> > > > > > > >>>>> To be fair: I don't know the exact code paths of the > producer > > > in > > > > > > >>>>> details, so please keep me honest. But my understanding is, > > > that > > > > > the > > > > > > >> KIP > > > > > > >>>>> aims to allow users to react to internal exception, and > > decide > > > to > > > > > > keep > > > > > > >>>>> retrying internally, swallow the error and drop the record, > > or > > > > > raise > > > > > > >> the > > > > > > >>>>> error? > > > > > > >>>>> > > > > > > >>>>> Maybe the KIP would need to be a little bit more precises > > what > > > > > error > > > > > > >> we > > > > > > >>>>> want to cover -- I don't think this list must be > exhaustive, > > as > > > > we > > > > > > can > > > > > > >>>>> always do follow up KIP to also apply the handler to other > > > errors > > > > > to > > > > > > >>>>> expand the scope of the handler. The KIP does mention > > examples, > > > > but > > > > > > it > > > > > > >>>>> might be good to explicitly state for what cases the > handler > > > gets > > > > > > >>> applied? > > > > > > >>>>> > > > > > > >>>>> I am also not sure if CONTINUE and FAIL are enough options? > > > Don't > > > > > we > > > > > > >>>>> need three options? Or would `CONTINUE` have different > > meaning > > > > > > >> depending > > > > > > >>>>> on the type of error? Ie, for a retryable error `CONTINUE` > > > would > > > > > mean > > > > > > >>>>> keep retrying internally, but for a non-retryable error > > > > `CONTINUE` > > > > > > >> means > > > > > > >>>>> swallow the error and drop the record? This semantic > overload > > > > seems > > > > > > >>>>> tricky to reason about by users, so it might better to > split > > > > > > >> `CONTINUE` > > > > > > >>>>> into two cases -> `RETRY` and `SWALLOW` (or some better > > names). > > > > > > >>>>> > > > > > > >>>>> Additionally, should we just ship a > > > > `DefaultClientExceptionHandler` > > > > > > >>>>> which would return `FAIL`, for backward compatibility. Or > > don't > > > > > have > > > > > > >> any > > > > > > >>>>> default handler to begin with and allow it to be `null`? I > > > don't > > > > > see > > > > > > >> the > > > > > > >>>>> need for a specific `TransactionExceptionHandler`. To me, > the > > > > goal > > > > > > >>>>> should be to not modify the default behavior at all, but to > > > just > > > > > > allow > > > > > > >>>>> users to change the default behavior if there is a need. > > > > > > >>>>> > > > > > > >>>>> What is missing on the KIP though it, how the handler is > > passed > > > > > into > > > > > > >> the > > > > > > >>>>> producer thought? Would we need a new config which allows > to > > > set > > > > a > > > > > > >>>>> custom handler? And/or would we allow to pass in an > instance > > > via > > > > > the > > > > > > >>>>> constructor or add a new method to set a handler? > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> -Matthias > > > > > > >>>>> > > > > > > >>>>> On 4/18/24 10:02 AM, Andrew Schofield wrote: > > > > > > >>>>>> Hi Alieh, > > > > > > >>>>>> Thanks for the KIP. > > > > > > >>>>>> > > > > > > >>>>>> Exception handling in the Kafka producer and consumer is > > > really > > > > > not > > > > > > >>>>> ideal. > > > > > > >>>>>> It’s even harder working out what’s going on with the > > > consumer. > > > > > > >>>>>> > > > > > > >>>>>> I’m a bit nervous about this KIP and I agree with Chris > that > > > it > > > > > > could > > > > > > >>> do > > > > > > >>>>> with additional > > > > > > >>>>>> motivation. This would be an expert-level interface given > > how > > > > > > >>> complicated > > > > > > >>>>>> the exception handling for Kafka has become. > > > > > > >>>>>> > > > > > > >>>>>> 7. The application is not really aware of the batching > being > > > > done > > > > > on > > > > > > >>> its > > > > > > >>>>> behalf. > > > > > > >>>>>> The ProduceResponse can actually return an array of > records > > > > which > > > > > > >>> failed > > > > > > >>>>>> per batch. If you get RecordTooLargeException, and want to > > > > retry, > > > > > > you > > > > > > >>>>> probably > > > > > > >>>>>> need to remove the offending records from the batch and > > retry > > > > it. > > > > > > >> This > > > > > > >>>>> is getting fiddly. > > > > > > >>>>>> > > > > > > >>>>>> 8. There is already o.a.k.clients.producer.Callback. I > > wonder > > > > > > whether > > > > > > >>> an > > > > > > >>>>>> alternative might be to add a method to the existing > > Callback > > > > > > >>> interface, > > > > > > >>>>> such as: > > > > > > >>>>>> > > > > > > >>>>>> ClientExceptionResponse onException(Exception exception) > > > > > > >>>>>> > > > > > > >>>>>> It would be called when a ProduceResponse contains an > error, > > > but > > > > > the > > > > > > >>>>>> producer is going to retry. It tells the producer whether > to > > > go > > > > > > ahead > > > > > > >>>>> with the retry > > > > > > >>>>>> or not. The default implementation would be to CONTINUE, > > > because > > > > > > >> that’s > > > > > > >>>>>> just continuing to retry as planned. Note that this is a > > > > > per-record > > > > > > >>>>> callback, so > > > > > > >>>>>> the application would be able to understand which records > > > > failed. > > > > > > >>>>>> > > > > > > >>>>>> By using an existing interface, we already know how to > > > configure > > > > > it > > > > > > >> and > > > > > > >>>>> we know > > > > > > >>>>>> about the threading model for calling it. > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> Thanks, > > > > > > >>>>>> Andrew > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>>> On 17 Apr 2024, at 18:17, Chris Egerton > > > > <chr...@aiven.io.INVALID > > > > > > > > > > > > >>>>> wrote: > > > > > > >>>>>>> > > > > > > >>>>>>> Hi Alieh, > > > > > > >>>>>>> > > > > > > >>>>>>> Thanks for the KIP! The issue with writing to > non-existent > > > > topics > > > > > > is > > > > > > >>>>>>> particularly frustrating for users of Kafka Connect and > has > > > > been > > > > > > the > > > > > > >>>>> source > > > > > > >>>>>>> of a handful of Jira tickets over the years. My thoughts: > > > > > > >>>>>>> > > > > > > >>>>>>> 1. An additional detail we can add to the motivation (or > > > > possibly > > > > > > >>>>> rejected > > > > > > >>>>>>> alternatives) section is that this kind of custom retry > > logic > > > > > can't > > > > > > >> be > > > > > > >>>>>>> implemented by hand by, e.g., setting retries to 0 in the > > > > > producer > > > > > > >>>>> config > > > > > > >>>>>>> and handling exceptions at the application level. Or > > rather, > > > it > > > > > > can, > > > > > > >>>>> but 1) > > > > > > >>>>>>> it's a bit painful to have to reimplement at every > > call-site > > > > for > > > > > > >>>>>>> Producer::send (and any code that awaits the returned > > Future) > > > > and > > > > > > 2) > > > > > > >>>>> it's > > > > > > >>>>>>> impossible to do this without losing idempotency on > > retries. > > > > > > >>>>>>> > > > > > > >>>>>>> 2. That said, I wonder if a pluggable interface is really > > the > > > > > right > > > > > > >>> call > > > > > > >>>>>>> here. Documenting the interactions of a producer with > > > > > > >>>>>>> a ClientExceptionHandler instance will be tricky, and > > > > > implementing > > > > > > >>> them > > > > > > >>>>>>> will also be a fair amount of work. I believe that there > > > needs > > > > to > > > > > > be > > > > > > >>>>> some > > > > > > >>>>>>> more granularity for how writes to non-existent topics > (or > > > > > really, > > > > > > >>>>>>> UNKNOWN_TOPIC_OR_PARTITION and related errors from the > > > broker) > > > > > are > > > > > > >>>>> handled, > > > > > > >>>>>>> but I'm torn between keeping it simple with maybe one or > > two > > > > new > > > > > > >>>>> producer > > > > > > >>>>>>> config properties, or a full-blown pluggable interface. > If > > > > there > > > > > > are > > > > > > >>>>> more > > > > > > >>>>>>> cases that would benefit from a pluggable interface, it > > would > > > > be > > > > > > >> nice > > > > > > >>> to > > > > > > >>>>>>> identify these and add them to the KIP to strengthen the > > > > > > motivation. > > > > > > >>>>> Right > > > > > > >>>>>>> now, I'm not sure the two that are mentioned in the > > > motivation > > > > > are > > > > > > >>>>>>> sufficient. > > > > > > >>>>>>> > > > > > > >>>>>>> 3. Alternatively, a possible compromise is for this KIP > to > > > > > > introduce > > > > > > >>> new > > > > > > >>>>>>> properties that dictate how to handle > > unknown-topic-partition > > > > and > > > > > > >>>>>>> record-too-large errors, with the thinking that if we > > > > introduce a > > > > > > >>>>> pluggable > > > > > > >>>>>>> interface later on, these properties will be recognized > by > > > the > > > > > > >> default > > > > > > >>>>>>> implementation of that interface but could be completely > > > > ignored > > > > > or > > > > > > >>>>>>> replaced by alternative implementations. > > > > > > >>>>>>> > > > > > > >>>>>>> 4. (Nit) You can remove the "This page is meant as a > > template > > > > for > > > > > > >>>>> writing a > > > > > > >>>>>>> KIP..." part from the KIP. It's not a template anymore :) > > > > > > >>>>>>> > > > > > > >>>>>>> 5. If we do go the pluggable interface route, wouldn't we > > > want > > > > to > > > > > > >> add > > > > > > >>>>> the > > > > > > >>>>>>> possibility for retry logic? The simplest version of this > > > could > > > > > be > > > > > > >> to > > > > > > >>>>> add a > > > > > > >>>>>>> RETRY value to the ClientExceptionHandlerResponse enum. > > > > > > >>>>>>> > > > > > > >>>>>>> 6. I think "SKIP" or "DROP" might be clearer instead of > > > > > "CONTINUE" > > > > > > >> for > > > > > > >>>>>>> the ClientExceptionHandlerResponse enum, since they cause > > > > records > > > > > > to > > > > > > >>> be > > > > > > >>>>>>> dropped. > > > > > > >>>>>>> > > > > > > >>>>>>> Cheers, > > > > > > >>>>>>> > > > > > > >>>>>>> Chris > > > > > > >>>>>>> > > > > > > >>>>>>> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan > > > > > > >>>>>>> <jols...@confluent.io.invalid> wrote: > > > > > > >>>>>>> > > > > > > >>>>>>>> Hey Alieh, > > > > > > >>>>>>>> > > > > > > >>>>>>>> I echo what Omnia says, I'm not sure I understand the > > > > > implications > > > > > > >> of > > > > > > >>>>> the > > > > > > >>>>>>>> change and I think more detail is needed. > > > > > > >>>>>>>> > > > > > > >>>>>>>> This comment also confused me a bit: > > > > > > >>>>>>>> * {@code ClientExceptionHandler} that continues the > > > > transaction > > > > > > >> even > > > > > > >>>>> if a > > > > > > >>>>>>>> record is too large. > > > > > > >>>>>>>> * Otherwise, it makes the transaction to fail. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Relatedly, I've been working with some folks on a KIP > for > > > > > > >>> transactions > > > > > > >>>>>>>> errors and how they are handled. Specifically for the > > > > > > >>>>>>>> RecordTooLargeException (and a few other errors), we > want > > to > > > > > give > > > > > > a > > > > > > >>> new > > > > > > >>>>>>>> error category for this error that allows the > application > > to > > > > > > choose > > > > > > >>>>> how it > > > > > > >>>>>>>> is handled. Maybe this KIP is something that you are > > looking > > > > > for? > > > > > > >>> Stay > > > > > > >>>>>>>> tuned :) > > > > > > >>>>>>>> > > > > > > >>>>>>>> Justine > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim < > > > > > > >>> o.g.h.ibra...@gmail.com > > > > > > >>>>>> > > > > > > >>>>>>>> wrote: > > > > > > >>>>>>>> > > > > > > >>>>>>>>> Hi Alieh, > > > > > > >>>>>>>>> Thanks for the KIP! I have couple of comments > > > > > > >>>>>>>>> - You mentioned in the KIP motivation, > > > > > > >>>>>>>>>> Another example for which a production exception > handler > > > > could > > > > > > be > > > > > > >>>>>>>> useful > > > > > > >>>>>>>>> is if a user tries to write into a non-existing topic, > > > which > > > > > > >>> returns a > > > > > > >>>>>>>>> retryable error code; with infinite retries, the > producer > > > > would > > > > > > >> hang > > > > > > >>>>>>>>> retrying forever. A handler could help to break the > > > infinite > > > > > > retry > > > > > > >>>>> loop. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> How the handler can differentiate between something > that > > is > > > > > > >>> temporary > > > > > > >>>>> and > > > > > > >>>>>>>>> it should keep retrying and something permanent like > > forgot > > > > to > > > > > > >>> create > > > > > > >>>>> the > > > > > > >>>>>>>>> topic? temporary here could be > > > > > > >>>>>>>>> the producer get deployed before the topic creation > > finish > > > > > > >>> (specially > > > > > > >>>>> if > > > > > > >>>>>>>>> the topic creation is handled via IaC) > > > > > > >>>>>>>>> temporary offline partitions > > > > > > >>>>>>>>> leadership changing > > > > > > >>>>>>>>> Isn’t this putting the producer at risk of > dropping > > > > > records > > > > > > >>>>>>>>> unintentionally? > > > > > > >>>>>>>>> - Can you elaborate more on what is written in the > > > > > compatibility > > > > > > / > > > > > > >>>>>>>>> migration plan section please by explaining in bit more > > > > details > > > > > > >> what > > > > > > >>>>> is > > > > > > >>>>>>>> the > > > > > > >>>>>>>>> changing behaviour and how this will impact client who > > are > > > > > > >>> upgrading? > > > > > > >>>>>>>>> - In the proposal changes can you elaborate in the KIP > > > where > > > > in > > > > > > >> the > > > > > > >>>>>>>>> producer lifecycle will ClientExceptionHandler and > > > > > > >>>>>>>>> TransactionExceptionHandler get triggered, and how will > > the > > > > > > >> producer > > > > > > >>>>>>>>> configure them to point to costumed implementation. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Thanks > > > > > > >>>>>>>>> Omnia > > > > > > >>>>>>>>> > > > > > > >>>>>>>>>> On 17 Apr 2024, at 13:13, Alieh Saeedi > > > > > > >>> <asae...@confluent.io.INVALID > > > > > > >>>>>> > > > > > > >>>>>>>>> wrote: > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Hi all, > > > > > > >>>>>>>>>> Here is the KIP-1038: Add Custom Error Handler to > > > Producer. > > > > > > >>>>>>>>>> < > > > > > > >>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> I look forward to your feedback! > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Cheers, > > > > > > >>>>>>>>>> Alieh > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>> > > > > > > >>>>> > > > > > > >>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >