Thanks Andrew. Done :) @Chris: I changed the config parameter type from boolean to integer, which defines the timeout for retrying. I thought reusing `max.block.ms` was not reasonable as you mentioned.
So if the KIP looks good, let 's skip to the good part ;-) VOTING :) Bests, Alieh On Tue, May 14, 2024 at 12:26 PM Andrew Schofield <andrew_schofi...@live.com> wrote: > Hi Alieh, > Just one final comment. > > [AJS5] Existing classes use Retriable, not Retryable. For example: > > https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html > > I suggest RetriableResponse and NonRetriableResponse. > > Thanks, > Andrew > > > On 13 May 2024, at 23:17, Alieh Saeedi <asae...@confluent.io.INVALID> > wrote: > > > > Hi all, > > > > > > Thanks for all the valid points you listed. > > > > > > KIP updates and addressing concerns: > > > > > > 1) The KIP now suggests two Response types: `RetryableResponse` and > > `NonRetryableResponse` > > > > > > 2) `custom.exception.handler` is changed to > `custom.exception.handler.class` > > > > > > 3) The KIP clarifies that `In the case of an implemented handler for the > > specified exception, the handler takes precedence.` > > > > > > 4) There is now a `default` implementation for both handle() methods. > > > > > > 5) @Chris: for `UnknownTopicOrPartition`, the default is already > retrying > > for 60s. (In fact, the default value of `max.block.ms`). If the handler > > instructs to FAIL or SWALLOW, there will be no retry, and if the handler > > instructs to RETRY, that will be the default behavior, which follows the > > values in already existing config parameters such as `max.block.ms`. > Does > > that make sense? > > > > > > Hope the changes and explanations are convincing :) > > > > > > Cheers, > > > > Alieh > > > > On Mon, May 13, 2024 at 6:40 PM Justine Olshan > <jols...@confluent.io.invalid> > > wrote: > > > >> Oh I see. The type isn't the error type but a newly defined type for the > >> response. Makes sense and works for me. > >> > >> Justine > >> > >> On Mon, May 13, 2024 at 9:13 AM Chris Egerton <fearthecel...@gmail.com> > >> wrote: > >> > >>> If we have dedicated methods for each kind of exception > >>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), doesn't > that > >>> provide sufficient constraint? I'm not suggesting we eliminate these > >>> methods, just that we change their return types to something more > >> flexible. > >>> > >>> On Mon, May 13, 2024, 12:07 Justine Olshan > <jols...@confluent.io.invalid > >>> > >>> wrote: > >>> > >>>> I'm not sure I agree with the Retriable and NonRetriableResponse > >> comment. > >>>> This doesn't limit the blast radius or enforce certain errors are > used. > >>>> I think we might disagree on how controlled these interfaces can be... > >>>> > >>>> Justine > >>>> > >>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton <chr...@aiven.io.invalid > >>> > >>>> wrote: > >>>> > >>>>> Hi Alieh, > >>>>> > >>>>> Thanks for the updates! I just have a few more thoughts: > >>>>> > >>>>> - I don't think a boolean property is sufficient to dictate retries > >> for > >>>>> unknown topic partitions, though. These errors can occur if a topic > >> has > >>>>> just been created, which can occur if, for example, automatic topic > >>>>> creation is enabled for a multi-task connector. This is why I > >> proposed > >>> a > >>>>> timeout instead of a boolean (and see my previous email for why > >>> reducing > >>>>> max.block.ms for a producer is not a viable alternative). If it > >> helps, > >>>> one > >>>>> way to reproduce this yourself is to add the line > >>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test here: > >>>>> > >>>>> > >>>> > >>> > >> > https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 > >>>>> and then check the logs afterward for messages like "Error while > >>> fetching > >>>>> metadata with correlation id <n> : > >>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". > >>>>> > >>>>> - I also don't think we need custom XxxResponse enums for every > >>> possible > >>>>> method; it seems like this will lead to a lot of duplication and > >>>> cognitive > >>>>> overhead if we want to expand the error handler in the future. > >>> Something > >>>>> more flexible like RetriableResponse and NonRetriableResponse could > >>>>> suffice. > >>>>> > >>>>> - Finally, the KIP still doesn't state how the handler will or won't > >>> take > >>>>> precedence over existing retry properties. If I set `retries` or ` > >>>>> delivery.timeout.ms` or `max.block.ms` to low values, will that > >> cause > >>>>> retries to cease even if my custom handler would otherwise keep > >>> returning > >>>>> RETRY for an error? > >>>>> > >>>>> Cheers, > >>>>> > >>>>> Chris > >>>>> > >>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield < > >>>>> andrew_schofi...@live.com> > >>>>> wrote: > >>>>> > >>>>>> Hi Alieh, > >>>>>> Just a few more comments on the KIP. It is looking much less risky > >>> now > >>>>> the > >>>>>> scope > >>>>>> is tighter. > >>>>>> > >>>>>> [AJS1] It would be nice to have default implementations of the > >> handle > >>>>>> methods > >>>>>> so an implementor would not need to implement both themselves. > >>>>>> > >>>>>> [AJS2] Producer configurations which are class names usually end in > >>>>>> “.class”. > >>>>>> I suggest “custom.exception.handler.class”. > >>>>>> > >>>>>> [AJS3] If I implemented a handler, and I set a non-default value > >> for > >>>> one > >>>>>> of the > >>>>>> new configuations, what happens? I would expect that the handler > >>> takes > >>>>>> precedence. I wasn’t quite clear what “the control will follow the > >>>>> handler > >>>>>> instructions” meant. > >>>>>> > >>>>>> [AJS4] Because you now have an enum for the > >>>>>> RecordTooLargeExceptionResponse, > >>>>>> I don’t think you need to state in the comment for > >>>>>> ProducerExceptionHandler that > >>>>>> RETRY will be interpreted as FAIL. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi > >>> <asae...@confluent.io.INVALID > >>>>> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> > >>>>>>> Thanks for the very interesting discussion during my PTO. > >>>>>>> > >>>>>>> > >>>>>>> KIP updates and addressing concerns: > >>>>>>> > >>>>>>> > >>>>>>> 1) Two handle() methods are defined in ProducerExceptionHandler > >> for > >>>> the > >>>>>> two > >>>>>>> exceptions with different input parameters so that we have > >>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and > >>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord record) > >>>>>>> > >>>>>>> > >>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well. > >>>>>>> > >>>>>>> > >>>>>>> 3) The KIP suggests having two more configuration parameters with > >>>>> boolean > >>>>>>> values: > >>>>>>> > >>>>>>> - `drop.invalid.large.records` with a default value of `false` > >> for > >>>>>>> swallowing too large records. > >>>>>>> > >>>>>>> - `retry.unknown.topic.partition` with a default value of `true` > >>> that > >>>>>>> performs RETRY for `max.block.ms` ms, encountering the > >>>>>>> UnknownTopicOrPartitionException. > >>>>>>> > >>>>>>> > >>>>>>> Hope the main concerns are addressed so that we can go forward > >> with > >>>>>> voting. > >>>>>>> > >>>>>>> > >>>>>>> Cheers, > >>>>>>> > >>>>>>> Alieh > >>>>>>> > >>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits > >>>>>>> <alivsh...@confluent.io.invalid> wrote: > >>>>>>> > >>>>>>>> Hi Mathias, > >>>>>>>> > >>>>>>>>> [AL1] While I see the point, I would think having a different > >>>>> callback > >>>>>>>> for every exception might not really be elegant? > >>>>>>>> > >>>>>>>> I'm not sure how to assess the level of elegance of the > >> proposal, > >>>> but > >>>>> I > >>>>>> can > >>>>>>>> comment on the technical characteristics: > >>>>>>>> > >>>>>>>> 1. Having specific interfaces that codify the logic that is > >>>> currently > >>>>>>>> prescribed in the comments reduce the chance of making a > >> mistake. > >>>>>>>> Commments may get ignored, misuderstood or etc. but if the > >>> contract > >>>> is > >>>>>>>> codified, the compilier will help to enforce the contract. > >>>>>>>> 2. Given that the logic is trickier than it seems (the > >>>>> record-too-large > >>>>>> is > >>>>>>>> an example that can easily confuse someone who's not intimately > >>>>> familiar > >>>>>>>> with the nuances of the batching logic), having a little more > >>> hoops > >>>> to > >>>>>> jump > >>>>>>>> would give a greater chance that whoever tries to add a new > >> cases > >>>>> pauses > >>>>>>>> and thinks a bit more. > >>>>>>>> 3. As Justine pointed out, having different method will be a > >>> forcing > >>>>>>>> function to go through a KIP rather than smuggle new cases > >> through > >>>>>>>> implementation. > >>>>>>>> 4. Sort of a consequence of the previous 3 -- all those things > >>>> reduce > >>>>>> the > >>>>>>>> chance of someone writing the code that works with 2 errors and > >>> then > >>>>>> when > >>>>>>>> more errors are added in the future will suddenly incorrectly > >>> ignore > >>>>> new > >>>>>>>> errors (the example I gave in the previous email). > >>>>>>>> > >>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some extend > >>> as > >>>>>>>> business logic. If a user puts a bad filter condition in their > >> KS > >>>> app, > >>>>>> and > >>>>>>>> drops messages > >>>>>>>> > >>>>>>>> I agree that there is always a chance to get a bug and lose > >>>> messages, > >>>>>> but > >>>>>>>> there are generally separation of concerns that has different > >> risk > >>>>>> profile: > >>>>>>>> the filtering logic may be more rigorously tested and rarely > >>> changed > >>>>>> (say > >>>>>>>> an application developer does it), but setting the topics to > >>> produce > >>>>>> may be > >>>>>>>> done via configuration (e.g. a user of the application does it) > >>> and > >>>>> it's > >>>>>>>> generally an expectation that users would get an error when > >>>>>> configuration > >>>>>>>> is incorrect. > >>>>>>>> > >>>>>>>> What could be worse is that UnknownTopicOrPartitionException can > >>> be > >>>> an > >>>>>>>> intermittent error, i.e. with a generally correct configuration, > >>>> there > >>>>>>>> could be metadata propagation problem on the cluster and then a > >>>> random > >>>>>> set > >>>>>>>> of records could get lost. > >>>>>>>> > >>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > >>>> checking > >>>>>> the > >>>>>>>> size of the record upfront is exactly what the KIP proposes? No? > >>>>>>>> > >>>>>>>> It achieves the same result but solves it differently, my > >>> proposal: > >>>>>>>> > >>>>>>>> 1. Application checks the validity of a record (maybe via a new > >>>>>>>> validateRecord method) before producing it, and can just exclude > >>> it > >>>> or > >>>>>>>> return an error to the user. > >>>>>>>> 2. Application produces the record -- at this point there are no > >>>>> records > >>>>>>>> that could return record too large, they were either skipped at > >>>> step 1 > >>>>>> or > >>>>>>>> we didn't get here because step 1 failed. > >>>>>>>> > >>>>>>>> Vs. KIP's proposal > >>>>>>>> > >>>>>>>> 1. Application produces the record. > >>>>>>>> 2. Application gets a callback. > >>>>>>>> 3. Application returns the action on how to proceed. > >>>>>>>> > >>>>>>>> The advantage of the former is the clarity of semantics -- the > >>>> record > >>>>> is > >>>>>>>> invalid (property of the record, not a function of server state > >> or > >>>>>> server > >>>>>>>> configuration) and we can clearly know that it is the record > >> that > >>> is > >>>>> bad > >>>>>>>> and can never succeed. > >>>>>>>> > >>>>>>>> The KIP-proposed way actually has a very tricky point: it > >> actually > >>>>>> handles > >>>>>>>> a subset of record-too-large exceptions. The broker can return > >>>>>>>> record-too-large and reject the whole batch (but we don't want > >> to > >>>>> ignore > >>>>>>>> those because then we can skip random records that just happened > >>> to > >>>> be > >>>>>> in > >>>>>>>> the same batch), in some sense we use the same error for 2 > >>> different > >>>>>>>> conditions and understanding that requires pretty deep > >>> understanding > >>>>> of > >>>>>>>> Kafka internals. > >>>>>>>> > >>>>>>>> -Artem > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > >>>>>> <jols...@confluent.io.invalid > >>>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> My concern with respect to it being fragile: the code that > >>> ensures > >>>>> the > >>>>>>>>> error type is internal to the producer. Someone may see it and > >>>> say, I > >>>>>>>> want > >>>>>>>>> to add such and such error. This looks like internal code, so I > >>>> don't > >>>>>>>> need > >>>>>>>>> a KIP, and then they can change it to whatever they want > >> thinking > >>>> it > >>>>> is > >>>>>>>>> within the typical kafka improvement protocol. > >>>>>>>>> > >>>>>>>>> Relying on an internal change to enforce an external API is > >>> fragile > >>>>> in > >>>>>> my > >>>>>>>>> opinion. That's why I sort of agreed with Artem with enforcing > >>> the > >>>>>> error > >>>>>>>> in > >>>>>>>>> the method signature -- part of the public API. > >>>>>>>>> > >>>>>>>>> Chris's comments on requiring more information to handler again > >>>> makes > >>>>>> me > >>>>>>>>> wonder if we are solving a problem of lack of information at > >> the > >>>>>>>>> application level with a more powerful solution than we need. > >>> (Ie, > >>>> if > >>>>>> we > >>>>>>>>> had more information, could the application close and restart > >> the > >>>>>>>>> transaction rather than having to drop records) But I am happy > >> to > >>>>>>>>> compromise with a handler that we can agree is sufficiently > >>>>> controlled > >>>>>>>> and > >>>>>>>>> documented. > >>>>>>>>> > >>>>>>>>> Justine > >>>>>>>>> > >>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton > >>>> <chr...@aiven.io.invalid > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Alieh, > >>>>>>>>>> > >>>>>>>>>> Continuing prior discussions: > >>>>>>>>>> > >>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching > >> point > >>> is > >>>>>>>> that I > >>>>>>>>>> don't see the point in allowing for this kind of pluggable > >> logic > >>>>>>>> without > >>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: if > >>> we're > >>>>>> going > >>>>>>>>> to > >>>>>>>>>> implement retries only on "important" topics when a topic > >>>> partition > >>>>>>>> isn't > >>>>>>>>>> found, why wouldn't we also want to be able to do this for > >> other > >>>>>>>> errors? > >>>>>>>>>> Again, taking authorization errors as an example, why wouldn't > >>> we > >>>>> want > >>>>>>>> to > >>>>>>>>>> be able to fail when we can't write to "important" topics > >>> because > >>>>> the > >>>>>>>>>> producer principal lacks sufficient ACLs, and drop the record > >> if > >>>> the > >>>>>>>>> topic > >>>>>>>>>> isn't "important"? In a security-conscious environment with > >>>>>>>>>> runtime-dependent topic routing (which is a common feature of > >>> many > >>>>>>>> source > >>>>>>>>>> connectors, such as the Debezium connectors), this seems > >> fairly > >>>>>> likely. > >>>>>>>>>> > >>>>>>>>>> 2) As far as changing the shape of the API goes, I like > >> Artem's > >>>> idea > >>>>>> of > >>>>>>>>>> splitting out the interface based on specific exceptions. This > >>> may > >>>>> be > >>>>>> a > >>>>>>>>>> little laborious to expand in the future, but if we really > >> want > >>> to > >>>>>>>>>> limit the exceptions that we cover with the handler and move > >>>> slowly > >>>>>> and > >>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in the > >>>>>>>>> interface. I > >>>>>>>>>> also acknowledge that there's no way to completely prevent > >>> people > >>>>> from > >>>>>>>>>> shooting themselves in the foot by implementing the API > >>>> incorrectly, > >>>>>>>> but > >>>>>>>>> I > >>>>>>>>>> think it's worth it to do what we can--including leveraging > >> the > >>>> Java > >>>>>>>>>> language's type system--to help them, so IMO there's value to > >>>>>>>> eliminating > >>>>>>>>>> the implicit behavior of failing when a policy returns RETRY > >>> for a > >>>>>>>>>> non-retriable error. This can take a variety of shapes and I'm > >>> not > >>>>>>>> going > >>>>>>>>> to > >>>>>>>>>> insist on anything specific, but I do want to again raise my > >>>>> concerns > >>>>>>>>> with > >>>>>>>>>> the current proposal and request that we find something a > >> little > >>>>>>>> better. > >>>>>>>>>> > >>>>>>>>>> 3) Concerning the default implementation--actually, I meant > >>> what I > >>>>>>>> wrote > >>>>>>>>> :) > >>>>>>>>>> I don't want a "second" default, I want an implementation of > >>> this > >>>>>>>>> interface > >>>>>>>>>> to be used as the default if no others are specified. The > >>> behavior > >>>>> of > >>>>>>>>> this > >>>>>>>>>> default implementation would be identical to existing behavior > >>> (so > >>>>>>>> there > >>>>>>>>>> would be no backwards compatibility concerns like the ones > >>> raised > >>>> by > >>>>>>>>>> Matthias), but it would be possible to configure this default > >>>>> handler > >>>>>>>>> class > >>>>>>>>>> to behave differently for a basic set of scenarios. This would > >>>>> mirror > >>>>>>>>> (pun > >>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and its > >>>>>>>>>> ReplicationPolicy interface [1]. There is a default > >>> implementation > >>>>>>>>>> available [2] that recognizes a handful of basic configuration > >>>>>>>> properties > >>>>>>>>>> [3] for simple tweaks, but if users want, they can also > >>> implement > >>>>>> their > >>>>>>>>> own > >>>>>>>>>> replication policy for more fine-grained logic if those > >>> properties > >>>>>>>> aren't > >>>>>>>>>> flexible enough. > >>>>>>>>>> > >>>>>>>>>> More concretely, I'm imagining something like this for the > >>>> producer > >>>>>>>>>> exception handler: > >>>>>>>>>> > >>>>>>>>>> - Default implementation class > >>>>>>>>>> of > >>>> org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > >>>>>>>>>> - This class would recognize two properties: > >>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults to > >>>> false. > >>>>> If > >>>>>>>>>> "false", then causes the handler to return FAIL whenever > >>>>>>>>>> a RecordTooLargeException is encountered; if "true", then > >> causes > >>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead > >>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer > >> property, > >>>>>>>> defaults > >>>>>>>>>> to > >>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is > >>>>> encountered, > >>>>>>>>>> causes the handler to return FAIL if that record has been > >>> pending > >>>>> for > >>>>>>>>> more > >>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be returned > >>>>>>>>>> > >>>>>>>>>> I think this is worth addressing now instead of later because > >> it > >>>>>> forces > >>>>>>>>> us > >>>>>>>>>> to evaluate the usefulness of this interface and it addresses > >> a > >>>>>>>>>> long-standing issue not just with Kafka Connect, but with the > >>> Java > >>>>>>>>> producer > >>>>>>>>>> in general. For reference, here are a few tickets I collected > >>>> after > >>>>>>>>> briefly > >>>>>>>>>> skimming our Jira showing that this is a real pain point for > >>>> users: > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340, > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990, > >>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. Although > >>> this > >>>> is > >>>>>>>>>> frequently reported with Kafka Connect, it applies to anyone > >> who > >>>>>>>>> configures > >>>>>>>>>> a producer to use a high retry timeout. I am aware of the > >>>>>> max.block.ms > >>>>>>>>>> property, but it's painful and IMO poor behavior to require > >>> users > >>>> to > >>>>>>>>> reduce > >>>>>>>>>> the value of this property just to handle the single scenario > >>> when > >>>>>>>> trying > >>>>>>>>>> to write to topics that don't exist, since it would also limit > >>> the > >>>>>>>> retry > >>>>>>>>>> timeout for other operations that are legitimately retriable. > >>>>>>>>>> > >>>>>>>>>> Raising new points: > >>>>>>>>>> > >>>>>>>>>> 5) I don't see the interplay between this handler and existing > >>>>>>>>>> retry-related properties mentioned anywhere in the KIP. I'm > >>>> assuming > >>>>>>>> that > >>>>>>>>>> properties like "retries", "max.block.ms", and " > >>>> delivery.timeout.ms > >>>>> " > >>>>>>>>> would > >>>>>>>>>> take precedence over the handler and once they are exhausted, > >>> the > >>>>>>>>>> record/batch will fail no matter what? If so, it's probably > >>> worth > >>>>>>>> briefly > >>>>>>>>>> mentioning this (no more than a sentence or two) in the KIP, > >> and > >>>> if > >>>>>>>> not, > >>>>>>>>>> I'm curious what you have in mind. > >>>>>>>>>> > >>>>>>>>>> 6) I also wonder if the API provides enough information in its > >>>>> current > >>>>>>>>>> form. Would it be possible to provide handlers with some way > >> of > >>>>>>>> tracking > >>>>>>>>>> how long a record has been pending for (i.e., how long it's > >> been > >>>>> since > >>>>>>>>> the > >>>>>>>>>> record was provided as an argument to Producer::send)? One way > >>> to > >>>> do > >>>>>>>> this > >>>>>>>>>> could be to add a method like `onNewRecord(ProducerRecord)` > >> and > >>>>>>>>>> allow/require the handler to do its own bookkeeping, probably > >>>> with a > >>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that the > >>>>> handler > >>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying to > >>> track > >>>>>>>>> records. > >>>>>>>>>> An alternative could be to include information about the > >> initial > >>>>> time > >>>>>>>> the > >>>>>>>>>> record was received by the producer and the number of retries > >>> that > >>>>>> have > >>>>>>>>>> been performed for it as parameters in the handle method(s), > >> but > >>>> I'm > >>>>>>>> not > >>>>>>>>>> sure how easy this would be to implement and it might clutter > >>>> things > >>>>>>>> up a > >>>>>>>>>> bit too much. > >>>>>>>>>> > >>>>>>>>>> 7) A small request--can we add Closeable (or, if you prefer, > >>>>>>>>> AutoCloseable) > >>>>>>>>>> as a superinterface for the handler interface? > >>>>>>>>>> > >>>>>>>>>> [1] - > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > >>>>>>>>>> [2] - > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > >>>>>>>>>> [3] - > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > >>>>>>>>>> , > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> > >>>>>>>>>> Chris > >>>>>>>>>> > >>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax < > >>> mj...@apache.org > >>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Very interesting discussion. > >>>>>>>>>>> > >>>>>>>>>>> Seems a central point is the question about "how generic" we > >>>>> approach > >>>>>>>>>>> this, and some people think we need to be conservative and > >>> others > >>>>>>>> think > >>>>>>>>>>> we should try to be as generic as possible. > >>>>>>>>>>> > >>>>>>>>>>> Personally, I think following a limited scope for this KIP > >> (by > >>>>>>>>>>> explicitly saying we only cover RecordTooLarge and > >>>>>>>>>>> UnknownTopicOrPartition) might be better. We have concrete > >>>> tickets > >>>>>>>> that > >>>>>>>>>>> we address, while for other exception (like authorization) we > >>>> don't > >>>>>>>>> know > >>>>>>>>>>> if people want to handle it to begin with. Boiling the ocean > >>>> might > >>>>>>>> not > >>>>>>>>>>> get us too far, and being somewhat pragmatic might help to > >> move > >>>>> this > >>>>>>>>> KIP > >>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we want > >> to > >>>> be > >>>>>>>>>>> careful anyway to not allow users to break stuff too easily. > >>>>>>>>>>> > >>>>>>>>>>> As the same time, I agree that we should setup this change > >> in a > >>>>>>>> forward > >>>>>>>>>>> looking way, and thus having a single generic handler allows > >> us > >>>> to > >>>>>>>>> later > >>>>>>>>>>> extend the handler more easily. This should also simplify > >>> follow > >>>> up > >>>>>>>> KIP > >>>>>>>>>>> that might add new error cases (I actually mentioned one more > >>> to > >>>>>>>> Alieh > >>>>>>>>>>> already, but we both agreed that it might be best to exclude > >> it > >>>>> from > >>>>>>>>> the > >>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow up > >> KIP > >>> is > >>>>> not > >>>>>>>>>>> the end of the world.) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> @Chris: > >>>>>>>>>>> > >>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it > >> actually > >>>>> would > >>>>>>>>> be? > >>>>>>>>>>> If the contract is clearly defined, it seems to be fine what > >>> the > >>>>> KIP > >>>>>>>>>>> proposes, and given that such a handler is an expert API, and > >>> we > >>>>> can > >>>>>>>>>>> provide "best practices" (cf my other comment below in > >> [AL1]), > >>>>> being > >>>>>>>> a > >>>>>>>>>>> little bit pragmatic sound fine to me. > >>>>>>>>>>> > >>>>>>>>>>> Not sure if I understand Justin's argument on this question? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> (3) About having a default impl or not. I am fine with adding > >>>> one, > >>>>>>>> even > >>>>>>>>>>> if I am not sure why Connect could not just add its own one > >> and > >>>>> plug > >>>>>>>> it > >>>>>>>>>>> in (and we would add corresponding configs for Connect, but > >> not > >>>> for > >>>>>>>> the > >>>>>>>>>>> producer itself)? For this case, we could also do this as a > >>>> follow > >>>>> up > >>>>>>>>>>> KIP, but happy to include it in this KIP to provide value to > >>>>> Connect > >>>>>>>>>>> right away (even if the value might not come right away if we > >>>> miss > >>>>>>>> the > >>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we > >> would > >>>> for > >>>>>>>>> sure > >>>>>>>>>>> plugin our own impl, and lock down the config such that users > >>>>> cannot > >>>>>>>>> set > >>>>>>>>>>> their own handler on the internal producer to begin with. > >> Might > >>>> be > >>>>>>>> good > >>>>>>>>>>> to elaborate why the producer should have a default? We might > >>>>>>>> actually > >>>>>>>>>>> want to add this to the KIP right away? > >>>>>>>>>>> > >>>>>>>>>>> The key for a default impl would be, to not change the > >> current > >>>>>>>>> behavior, > >>>>>>>>>>> and having no default seems to achieve this. For the two > >> cases > >>>> you > >>>>>>>>>>> mentioned, it's unclear to me what default value on "upper > >>> bound > >>>> on > >>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should set? > >>> Seems > >>>>> it > >>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? However, > >>> if > >>>>>>>> users > >>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would > >> need > >>> to > >>>>> set > >>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we > >>>> hard-code 2 > >>>>>>>>>>> minutes, it might not be backward compatible. I have the > >>>> impression > >>>>>>>> we > >>>>>>>>>>> might introduce some undesired coupling? -- For the "record > >> too > >>>>>>>> large" > >>>>>>>>>>> case, the config seems to be boolean and setting it to > >> `false` > >>> by > >>>>>>>>>>> default seems to provide backward compatibility. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> @Artem: > >>>>>>>>>>> > >>>>>>>>>>> [AL1] While I see the point, I would think having a different > >>>>>>>> callback > >>>>>>>>>>> for every exception might not really be elegant? In the end, > >>> the > >>>>>>>>> handler > >>>>>>>>>>> is an very advanced feature anyway, and if it's implemented > >> in > >>> a > >>>>> bad > >>>>>>>>>>> way, well, it's a user error -- we cannot protect users from > >>>>>>>>> everything. > >>>>>>>>>>> To me, a handler like this, is to some extend "business > >> logic" > >>>> and > >>>>>>>> if a > >>>>>>>>>>> user gets business logic wrong, it's hard to protect them. -- > >>> We > >>>>>>>> would > >>>>>>>>>>> of course provide best practice guidance in the JaveDocs, and > >>>>> explain > >>>>>>>>>>> that a handler should have explicit `if` statements for stuff > >>> it > >>>>> want > >>>>>>>>> to > >>>>>>>>>>> handle, and only a single default which return FAIL. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application > >> layer. > >>>> Ie, > >>>>>>>> the > >>>>>>>>>>> TX is not completed, we clean up and setup out task from > >>> scratch, > >>>>> to > >>>>>>>>>>> ensure the pending transaction is completed before we resume. > >>> If > >>>>> the > >>>>>>>> TX > >>>>>>>>>>> was indeed aborted, we would retry from older offset and thus > >>>> just > >>>>>>>> hit > >>>>>>>>>>> the same error again and the loop begins again. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some > >> extend > >>>> as > >>>>>>>>>>> business logic. If a user puts a bad filter condition in > >> their > >>> KS > >>>>>>>> app, > >>>>>>>>>>> and drops messages, it nothing we can do about it, and this > >>>> handler > >>>>>>>>>>> IMHO, has a similar purpose. This is also the line of > >> thinking > >>> I > >>>>>>>> apply > >>>>>>>>>>> to EOS, to address Justin's concern about "should we allow to > >>>> drop > >>>>>>>> for > >>>>>>>>>>> EOS", and my answer is "yes", because it's more business > >> logic > >>>> than > >>>>>>>>>>> actual error handling IMHO. And by default, we fail... So > >> users > >>>>>>>> opt-in > >>>>>>>>>>> to add business logic to drop records. It's an application > >>> level > >>>>>>>>>>> decision how to write the code. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > >>>>> checking > >>>>>>>>> the > >>>>>>>>>>> size of the record upfront is exactly what the KIP proposes? > >>> No? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> @Justin: > >>>>>>>>>>> > >>>>>>>>>>>> I saw the sample > >>>>>>>>>>>> code -- is it just an if statement checking for the error > >>> before > >>>>>>>> the > >>>>>>>>>>>> handler is invoked? That seems a bit fragile. > >>>>>>>>>>> > >>>>>>>>>>> What do you mean by fragile? Not sure if I see your point. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -Matthias > >>>>>>>>>>> > >>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > >>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP. The motivation talks about very > >> specific > >>>>>>>> cases, > >>>>>>>>>> but > >>>>>>>>>>>> the interface is generic. > >>>>>>>>>>>> > >>>>>>>>>>>> [AL1] > >>>>>>>>>>>> If the interface evolves in the future I think we could have > >>> the > >>>>>>>>>>> following > >>>>>>>>>>>> confusion: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. A user implemented SWALLOW action for both > >>>>>>>> RecordTooLargeException > >>>>>>>>>> and > >>>>>>>>>>>> UnknownTopicOrPartitionException. For simpicity they just > >>>> return > >>>>>>>>>> SWALLOW > >>>>>>>>>>>> from the function, because it elegantly handles all known > >>> cases. > >>>>>>>>>>>> 2. The interface has evolved to support a new exception. > >>>>>>>>>>>> 3. The user has upgraded their Kafka client. > >>>>>>>>>>>> > >>>>>>>>>>>> Now a new kind of error gets dropped on the floor without > >>> user's > >>>>>>>>>>> intention > >>>>>>>>>>>> and it would be super hard to detect and debug. > >>>>>>>>>>>> > >>>>>>>>>>>> To avoid the confusion, I think we should use handlers for > >>>>> specific > >>>>>>>>>>>> exceptions. Then if a new exception is added it won't get > >>>>> silently > >>>>>>>>>>> swalled > >>>>>>>>>>>> because the user would need to add new functionality to > >> handle > >>>> it. > >>>>>>>>>>>> > >>>>>>>>>>>> I also have some higher level comments: > >>>>>>>>>>>> > >>>>>>>>>>>> [AL2] > >>>>>>>>>>>>> it throws a TimeoutException, and the user can only blindly > >>>>> retry, > >>>>>>>>>> which > >>>>>>>>>>>> may result in an infinite retry loop > >>>>>>>>>>>> > >>>>>>>>>>>> If the TimeoutException happens during transactional > >>> processing > >>>>>>>>>> (exactly > >>>>>>>>>>>> once is the desired sematnics), then the client should not > >>> retry > >>>>>>>> when > >>>>>>>>>> it > >>>>>>>>>>>> gets TimeoutException because without knowing the reason for > >>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the > >> message > >>>> got > >>>>>>>>>>> actually > >>>>>>>>>>>> produced or not and retrying the message may result in > >>>>> duplicatees. > >>>>>>>>>>>> > >>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to the > >>>>>>>> underlying > >>>>>>>>>> root > >>>>>>>>>>>> cause of missing metadata > >>>>>>>>>>>> > >>>>>>>>>>>> Maybe we should fix the error handling and return the proper > >>>>>>>>> underlying > >>>>>>>>>>>> message? Then the application can properly handle the > >> message > >>>>>>>> based > >>>>>>>>> on > >>>>>>>>>>>> preferences. > >>>>>>>>>>>> > >>>>>>>>>>>> From the product perspective, it's not clear how safe it is > >> to > >>>>>>>>> blindly > >>>>>>>>>>>> ignore UnknownTopicOrPartitionException. This could lead to > >>>>>>>>> situations > >>>>>>>>>>>> when a simple typo could lead to massive data loss (part of > >>> the > >>>>>>>> data > >>>>>>>>>>> would > >>>>>>>>>>>> effectively be produced to a "black hole" and the user may > >> not > >>>>>>>> notice > >>>>>>>>>> it > >>>>>>>>>>>> for a while). > >>>>>>>>>>>> > >>>>>>>>>>>> In which situations would you recommend the user to "black > >>> hole" > >>>>>>>>>> messages > >>>>>>>>>>>> in case of misconfiguration? > >>>>>>>>>>>> > >>>>>>>>>>>> [AL3] > >>>>>>>>>>>> > >>>>>>>>>>>>> If the custom handler decides on SWALLOW for > >>>>>>>>> RecordTooLargeException, > >>>>>>>>>>>> > >>>>>>>>>>>> Is it my understanding that this KIP proposes that > >>> functionality > >>>>>>>> that > >>>>>>>>>>> would > >>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that happen > >>>>> because > >>>>>>>>> the > >>>>>>>>>>>> producer cannot produce the record (if the broker rejects > >> the > >>>>>>>> batch, > >>>>>>>>>> the > >>>>>>>>>>>> error won't get to the handler, because we cannot know which > >>>> other > >>>>>>>>>>> records > >>>>>>>>>>>> get ignored). In this case, why not just check the locally > >>>>>>>>> configured > >>>>>>>>>>> max > >>>>>>>>>>>> record size upfront and not produce the recrord in the first > >>>>> place? > >>>>>>>>>>> Maybe > >>>>>>>>>>>> we can expose a validation function from the producer that > >>> could > >>>>>>>>>> validate > >>>>>>>>>>>> the records locally, so we don't need to produce the record > >> in > >>>>>>>> order > >>>>>>>>> to > >>>>>>>>>>>> know that it's invalid. > >>>>>>>>>>>> > >>>>>>>>>>>> -Artem > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > >>>>>>>>>>> <jols...@confluent.io.invalid> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Alieh and Chris, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I guess > >> I > >>>> just > >>>>>>>>>> didn't > >>>>>>>>>>>>> understand how that would be ensured on the producer side. > >> I > >>>> saw > >>>>>>>> the > >>>>>>>>>>> sample > >>>>>>>>>>>>> code -- is it just an if statement checking for the error > >>>> before > >>>>>>>> the > >>>>>>>>>>>>> handler is invoked? That seems a bit fragile. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Can you clarify what you mean by `since the code does not > >>> reach > >>>>>>>> the > >>>>>>>>> KS > >>>>>>>>>>>>> interface and breaks somewhere in producer.` If we surfaced > >>>> this > >>>>>>>>> error > >>>>>>>>>>> to > >>>>>>>>>>>>> the application in a better way would that also be a > >> solution > >>>> to > >>>>>>>> the > >>>>>>>>>>> issue? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Justine > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > >>>>>>>>>>> <asae...@confluent.io.invalid> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you, Chris and Justine, for the feedback. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> @Chris > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) Flexibility: it has two meanings. The first meaning is > >>> the > >>>>> one > >>>>>>>>> you > >>>>>>>>>>>>>> mentioned. We are going to cover more exceptions in the > >>>> future, > >>>>>>>> but > >>>>>>>>>> as > >>>>>>>>>>>>>> Justine mentioned, we must be very conservative about > >> adding > >>>>> more > >>>>>>>>>>>>>> exceptions. Additionally, flexibility mainly means that > >> the > >>>> user > >>>>>>>> is > >>>>>>>>>>> able > >>>>>>>>>>>>> to > >>>>>>>>>>>>>> develop their own code. As mentioned in the motivation > >>> section > >>>>>>>> and > >>>>>>>>>> the > >>>>>>>>>>>>>> examples, sometimes the user decides on dropping a record > >>>> based > >>>>>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>>> topic, for example. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2) Defining two separate methods for retriable and > >>>> non-retriable > >>>>>>>>>>>>>> exceptions: although the idea is brilliant, the user may > >>> still > >>>>>>>>> make a > >>>>>>>>>>>>>> mistake by implementing the wrong method and see a > >>>> non-expecting > >>>>>>>>>>>>> behaviour. > >>>>>>>>>>>>>> For example, he may implement handleRetriable() for > >>>>>>>>>>>>> RecordTooLargeException > >>>>>>>>>>>>>> and define SWALLOW for the exception, but in practice, he > >>> sees > >>>>> no > >>>>>>>>>>> change > >>>>>>>>>>>>> in > >>>>>>>>>>>>>> default behaviour since he implemented the wrong method. I > >>>> think > >>>>>>>> we > >>>>>>>>>> can > >>>>>>>>>>>>>> never reduce the user’s mistakes to 0. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3) Default implementation for Handler: the default > >> behaviour > >>>> is > >>>>>>>>>> already > >>>>>>>>>>>>>> preserved with NO need of implementing any handler or > >>> setting > >>>>> the > >>>>>>>>>>>>>> corresponding config parameter `custom.exception.handler`. > >>>> What > >>>>>>>> you > >>>>>>>>>>> mean > >>>>>>>>>>>>> is > >>>>>>>>>>>>>> actually having a second default, which requires having > >> both > >>>>>>>>>> interface > >>>>>>>>>>>>> and > >>>>>>>>>>>>>> config parameters. About UnknownTopicOrPartitionException: > >>> the > >>>>>>>>>> producer > >>>>>>>>>>>>>> already offers the config parameter `max.block.ms` which > >>>>>>>>> determines > >>>>>>>>>>> the > >>>>>>>>>>>>>> duration of retrying. The main purpose of the user who > >> needs > >>>> the > >>>>>>>>>>> handler > >>>>>>>>>>>>> is > >>>>>>>>>>>>>> to get the root cause of TimeoutException and handle it in > >>> the > >>>>>>>> way > >>>>>>>>> he > >>>>>>>>>>>>>> intends. The KIP explains the necessity of it for KS > >> users. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4) Naming issue: By SWALLOW, we meant actually swallow the > >>>>> error, > >>>>>>>>>> while > >>>>>>>>>>>>>> SKIP means skip the record, I think. If it makes sense for > >>>> more > >>>>>>>>> ppl, > >>>>>>>>>> I > >>>>>>>>>>>>> can > >>>>>>>>>>>>>> change it to SKIP > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> @Justine > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1) was addressed by Chris. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2 and 3) The problem is exactly what you mentioned. > >>> Currently, > >>>>>>>>> there > >>>>>>>>>> is > >>>>>>>>>>>>> no > >>>>>>>>>>>>>> way to handle these issues application-side. Even KS users > >>> who > >>>>>>>>>>> implement > >>>>>>>>>>>>> KS > >>>>>>>>>>>>>> ProductionExceptionHandler are not able to handle the > >>>> exceptions > >>>>>>>> as > >>>>>>>>>>> they > >>>>>>>>>>>>>> intend since the code does not reach the KS interface and > >>>> breaks > >>>>>>>>>>>>> somewhere > >>>>>>>>>>>>>> in producer. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, May 7, 2024 at 8:43 PM Chris Egerton < > >>>>>>>>>> fearthecel...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Justine, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The method signatures for the interface are indeed > >>>> open-ended, > >>>>>>>> but > >>>>>>>>>> the > >>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>> states that its uses will be limited. See the motivation > >>>>>>>> section: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> We believe that the user should be able to develop > >> custom > >>>>>>>>> exception > >>>>>>>>>>>>>>> handlers for managing producer exceptions. On the other > >>> hand, > >>>>>>>> this > >>>>>>>>>>> will > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>> an expert-level API, and using that may result in strange > >>>>>>>>> behaviour > >>>>>>>>>> in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> system, making it hard to find the root cause. Therefore, > >>> the > >>>>>>>>> custom > >>>>>>>>>>>>>>> handler is currently limited to handling > >>>>> RecordTooLargeException > >>>>>>>>> and > >>>>>>>>>>>>>>> UnknownTopicOrPartitionException. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, May 7, 2024, 14:37 Justine Olshan > >>>>>>>>>>> <jols...@confluent.io.invalid > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I was out for KSB and then was also sick. :( > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> To your point 1) Chris, I don't think it is limited to > >> two > >>>>>>>>> specific > >>>>>>>>>>>>>>>> scenarios, since the interface accepts a generic > >>> Exception e > >>>>>>>> and > >>>>>>>>>> can > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> implemented to check if that e is an instanceof any > >>>> exception. > >>>>>>>> I > >>>>>>>>>>>>> didn't > >>>>>>>>>>>>>>> see > >>>>>>>>>>>>>>>> anywhere that specific errors are enforced. I'm a bit > >>>>> concerned > >>>>>>>>>> about > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>> actually. I'm concerned about the opened-endedness and > >> the > >>>>>>>>> contract > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> with transactions. We are allowing the client to make > >>>>> decisions > >>>>>>>>>> that > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> somewhat invisible to the server. As an aside, can we > >>> build > >>>> in > >>>>>>>>> log > >>>>>>>>>>>>>>> messages > >>>>>>>>>>>>>>>> when the handler decides to skip etc a message. I'm > >> really > >>>>>>>>>> concerned > >>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>> messages being silently dropped. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I do think Chris's point 2) about retriable vs non > >>> retriable > >>>>>>>>> errors > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> fair. I'm a bit concerned about skipping a unknown topic > >>> or > >>>>>>>>>> partition > >>>>>>>>>>>>>>>> exception too early, as there are cases where it can be > >>>>>>>>> transient. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm still a little bit wary of allowing dropping records > >>> as > >>>>>>>> part > >>>>>>>>> of > >>>>>>>>>>>>> EOS > >>>>>>>>>>>>>>>> generally as in many cases, these errors signify an > >> issue > >>>> with > >>>>>>>>> the > >>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>> data. I understand that streams and connect/mirror maker > >>> may > >>>>>>>> have > >>>>>>>>>>>>>> reasons > >>>>>>>>>>>>>>>> they want to progress past these messages, but wondering > >>> if > >>>>>>>> there > >>>>>>>>>> is > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>> that can be done application-side. I'm willing to accept > >>>> this > >>>>>>>>> sort > >>>>>>>>>> of > >>>>>>>>>>>>>>>> proposal if we can make it clear that this sort of thing > >>> is > >>>>>>>>>> happening > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> we limit the blast radius for what we can do. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Justine > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 9:55 AM Chris Egerton > >>>>>>>>>> <chr...@aiven.io.invalid > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Sorry for the delay, I've been out sick. I still have > >>> some > >>>>>>>>>> thoughts > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> I'd like to see addressed before voting. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1) If flexibility is the motivation for a pluggable > >>>>> interface, > >>>>>>>>> why > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> only limiting the uses for this interface to two very > >>>>> specific > >>>>>>>>>>>>>>> scenarios? > >>>>>>>>>>>>>>>>> Why not also allow, e.g., authorization errors to be > >>>> handled > >>>>>>>> as > >>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>> (allowing users to drop records destined for some > >>>> off-limits > >>>>>>>>>>>>> topics, > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>> retry for a limited duration in case there's a delay in > >>> the > >>>>>>>>>>>>>> propagation > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> ACL updates)? It'd be nice to see some analysis of > >> other > >>>>>>>> errors > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> be handled with this new API, both to avoid the > >> follow-up > >>>>> work > >>>>>>>>> of > >>>>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>> KIP to address them in the future, and to make sure > >> that > >>>>> we're > >>>>>>>>> not > >>>>>>>>>>>>>>>> painting > >>>>>>>>>>>>>>>>> ourselves into a corner with the current API in a way > >>> that > >>>>>>>> would > >>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>> future modifications difficult. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2) Something feels a bit off with how retriable vs. > >>>>>>>>> non-retriable > >>>>>>>>>>>>>>> errors > >>>>>>>>>>>>>>>>> are handled with the interface. Why not introduce two > >>>>> separate > >>>>>>>>>>>>>> methods > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> handle each case separately? That way there's no > >>> ambiguity > >>>> or > >>>>>>>>>>>>>> implicit > >>>>>>>>>>>>>>>>> behavior when, e.g., attempting to retry on a > >>>>>>>>>>>>>> RecordTooLargeException. > >>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>> could be something like `NonRetriableResponse > >>>>>>>>>>>>> handle(ProducerRecord, > >>>>>>>>>>>>>>>>> Exception)` and `RetriableResponse > >>>>>>>>> handleRetriable(ProducerRecord, > >>>>>>>>>>>>>>>>> Exception)`, though the exact names and shape can > >>> obviously > >>>>> be > >>>>>>>>>>>>> toyed > >>>>>>>>>>>>>>>> with a > >>>>>>>>>>>>>>>>> bit. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3) Although the flexibility of a pluggable interface > >> may > >>>>>>>> benefit > >>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>> users' custom producer applications and Kafka Streams > >>>>>>>>>> applications, > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>> comes at significant deployment cost for other > >>> low-/no-code > >>>>>>>>>>>>>>> environments, > >>>>>>>>>>>>>>>>> including but not limited to Kafka Connect and > >>> MirrorMaker > >>>> 2. > >>>>>>>>> Can > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>> add > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> default implementation of the exception handler that > >>> allows > >>>>>>>> for > >>>>>>>>>>>>> some > >>>>>>>>>>>>>>>> simple > >>>>>>>>>>>>>>>>> behavior to be tweaked via configuration property? Two > >>>> things > >>>>>>>>> that > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> nice to have would be A) an upper bound on the retry > >> time > >>>> for > >>>>>>>>>>>>>>>>> unknown-topic-partition exceptions and B) an option to > >>> drop > >>>>>>>>>> records > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> are large enough to trigger a record-too-large > >> exception. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 4) I'd still prefer to see "SKIP" or "DROP" instead of > >>> the > >>>>>>>>>> proposed > >>>>>>>>>>>>>>>>> "SWALLOW" option, which IMO is opaque and non-obvious, > >>>>>>>>> especially > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> trying to guess the behavior for retriable errors. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Chris > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, May 3, 2024 at 11:23 AM Alieh Saeedi > >>>>>>>>>>>>>>>> <asae...@confluent.io.invalid > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> A summary of the KIP and the discussions: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The KIP introduces a handler interface for Producer in > >>>> order > >>>>>>>> to > >>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>> two > >>>>>>>>>>>>>>>>>> exceptions: RecordTooLargeException and > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException. > >>>>>>>>>>>>>>>>>> The handler handles the exceptions per-record. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Do we need this handler? [Motivation and Examples > >>>>>>>> sections] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> RecordTooLargeException: 1) In transactions, the > >>> producer > >>>>>>>>>>>>> collects > >>>>>>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>> records in batches. Then a RecordTooLargeException > >>> related > >>>>>>>> to a > >>>>>>>>>>>>>>> single > >>>>>>>>>>>>>>>>>> record leads to failing the entire batch. A custom > >>>> exception > >>>>>>>>>>>>>> handler > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> this case may decide on dropping the record and > >>> continuing > >>>>>>>> the > >>>>>>>>>>>>>>>>> processing. > >>>>>>>>>>>>>>>>>> See Example 1, please. 2) More over, in Kafka > >> Streams, a > >>>>>>>> record > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> too > >>>>>>>>>>>>>>>>>> large is a poison pill record, and there is no way to > >>> skip > >>>>>>>> over > >>>>>>>>>>>>>> it. A > >>>>>>>>>>>>>>>>>> handler would allow us to react to this error inside > >> the > >>>>>>>>>>>>> producer, > >>>>>>>>>>>>>>>> i.e., > >>>>>>>>>>>>>>>>>> local to where the error happens, and thus simplify > >> the > >>>>>>>> overall > >>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>> significantly. Please read the Motivation section for > >>> more > >>>>>>>>>>>>>>> explanation. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException: For this case, the > >>>>> producer > >>>>>>>>>>>>>> handles > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>> exception internally and only issues a WARN log about > >>>>> missing > >>>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> retries internally. Later, when the producer hits " > >>>>>>>>>>>>>>> deliver.timeout.ms" > >>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> throws a TimeoutException, and the user can only > >> blindly > >>>>>>>> retry, > >>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>> result in an infinite retry loop. The thrown > >>>>> TimeoutException > >>>>>>>>>>>>>> "cuts" > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> connection to the underlying root cause of missing > >>>> metadata > >>>>>>>>>>>>> (which > >>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>> indeed be a transient error but is persistent for a > >>>>>>>>> non-existing > >>>>>>>>>>>>>>>> topic). > >>>>>>>>>>>>>>>>>> Thus, there is no programmatic way to break the > >> infinite > >>>>>>>> retry > >>>>>>>>>>>>>> loop. > >>>>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>> Streams also blindly retries for this case, and the > >>>>>>>> application > >>>>>>>>>>>>>> gets > >>>>>>>>>>>>>>>>> stuck. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Having interface vs configuration option: > >> [Motivation, > >>>>>>>>>>>>> Examples, > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> Rejected Alternatives sections] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Our solution is introducing an interface due to the > >> full > >>>>>>>>>>>>>> flexibility > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> it offers. Sometimes users, especially Kafka Streams > >>> ones, > >>>>>>>>>>>>>> determine > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> handler's behaviour based on the situation. For > >>> example, f > >>>>>>>>>>>>>>>>>> acing UnknownTopicOrPartitionException*, *the user may > >>>> want > >>>>>>>> to > >>>>>>>>>>>>>> raise > >>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>> error for some topics but retry it for other topics. > >>>> Having > >>>>> a > >>>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>>> option with a fixed set of possibilities does not > >> serve > >>>> the > >>>>>>>>>>>>> user's > >>>>>>>>>>>>>>>>>> needs. See Example 2, please. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Note on RecordTooLargeException: [Public Interfaces > >>>>>>>> section] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for > >>>>>>>>>>>>>> RecordTooLargeException, > >>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>> this record will not be a part of the batch of > >>>> transactions > >>>>>>>> and > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>> not be sent to the broker in non-transactional mode. > >> So > >>> no > >>>>>>>>>>>>> worries > >>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>> getting a RecordTooLargeException from the broker in > >>> this > >>>>>>>> case, > >>>>>>>>>>>>> as > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> record will never ever be sent to the broker. SWALLOW > >>>> means > >>>>>>>>> drop > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>> and continue/swallow the error. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - What if the handle() method implements RETRY for > >>>>>>>>>>>>>>>>> RecordTooLargeException? > >>>>>>>>>>>>>>>>>> [Proposed Changes section] > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> We have to limit the user to only have FAIL or SWALLOW > >>> for > >>>>>>>>>>>>>>>>>> RecordTooLargeException. Actually, RETRY must be equal > >>> to > >>>>>>>> FAIL. > >>>>>>>>>>>>>> This > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> well documented/informed in javadoc. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - What if the handle() method of the handler throws an > >>>>>>>>> exception? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The handler is expected to have correct code. If it > >>> throws > >>>>> an > >>>>>>>>>>>>>>>> exception, > >>>>>>>>>>>>>>>>>> everything fails. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This is a PoC PR < > >>>>> https://github.com/apache/kafka/pull/15846 > >>>>>>>>> > >>>>>>>>>>>>> ONLY > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> RecordTooLargeException. The code changes related to > >>>>>>>>>>>>>>>>>> UnknownTopicOrPartitionException will be added to this > >>> PR > >>>>>>>>> LATER. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Looking forward to your feedback again. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Thu, Apr 25, 2024 at 11:46 PM Kirk True < > >>>>>>>> k...@kirktrue.pro> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Alieh, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the updates! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Comments inline... > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Apr 25, 2024, at 1:10 PM, Alieh Saeedi > >>>>>>>>>>>>>>>>> <asae...@confluent.io.INVALID > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks a lot for the constructive feedbacks! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Addressing some of the main concerns: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - The `RecordTooLargeException` can be thrown by > >>> broker, > >>>>>>>>>>>>>> producer > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> consumer. Of course, the `ProducerExceptionHandler` > >>>>>>>> interface > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> introduced > >>>>>>>>>>>>>>>>>>>> to affect only the exceptions thrown from the > >>> producer. > >>>>>>>> This > >>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>> specifically means to provide a possibility to > >> manage > >>>> the > >>>>>>>>>>>>>>>>>>>> `RecordTooLargeException` thrown from the > >>>> Producer.send() > >>>>>>>>>>>>>> method. > >>>>>>>>>>>>>>>>>> Please > >>>>>>>>>>>>>>>>>>>> see “Proposed Changes” section for more clarity. I > >>>>>>>>>>>>> investigated > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> issue > >>>>>>>>>>>>>>>>>>>> there thoroughly. I hope it can explain the concern > >>>> about > >>>>>>>> how > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> errors as well. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - The problem with Callback: Methods of Callback are > >>>>> called > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>>>> sent to the server is acknowledged, while this is > >> not > >>>> the > >>>>>>>>>>>>>> desired > >>>>>>>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> all exceptions. We intend to handle exceptions > >>>> beforehand. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I guess it makes sense to keep the expectation for > >> when > >>>>>>>>>>>>> Callback > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> invoked as-is vs. shoehorning more into it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - What if the custom handler returns RETRY for > >>>>>>>>>>>>>>>>>>> `RecordTooLargeException`? I > >>>>>>>>>>>>>>>>>>>> assume changing the producer configuration at > >> runtime > >>> is > >>>>>>>>>>>>>>> possible. > >>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>> so, > >>>>>>>>>>>>>>>>>>>> RETRY for a too large record is valid because maybe > >> in > >>>> the > >>>>>>>>>>>>> next > >>>>>>>>>>>>>>>> try, > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> too large record is not poisoning any more. I am not > >>>> 100% > >>>>>>>>>>>>> sure > >>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> technical details, though. Otherwise, we can > >> consider > >>>> the > >>>>>>>>>>>>> RETRY > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> FAIL > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> this exception. Another solution would be to > >> consider > >>> a > >>>>>>>>>>>>>> constant > >>>>>>>>>>>>>>>>> number > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> times for RETRY which can be useful for other > >>> exceptions > >>>>> as > >>>>>>>>>>>>>> well. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It’s not presently possible to change the > >> configuration > >>>> of > >>>>>>>> an > >>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>> Producer at runtime. So if a record hits a > >>>>>>>>>>>>>> RecordTooLargeException > >>>>>>>>>>>>>>>>> once, > >>>>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>> amount of retrying (with the current Producer) will > >>>> change > >>>>>>>>> that > >>>>>>>>>>>>>>> fact. > >>>>>>>>>>>>>>>>> So > >>>>>>>>>>>>>>>>>>> I’m still a little stuck on how to handle a response > >> of > >>>>>>>> RETRY > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>> “oversized” record. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - What if the handle() method itself throws an > >>>> exception? > >>>>> I > >>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>> rationally and pragmatically, the behaviour must be > >>>>> exactly > >>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>>>>> custom handler is defined since the user actually > >> did > >>>> not > >>>>>>>>>>>>> have > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>>> handler. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I’m not convinced that ignoring an errant handler is > >>> the > >>>>>>>> right > >>>>>>>>>>>>>>>> choice. > >>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>> then becomes a silent failure that might have > >>>>> repercussions, > >>>>>>>>>>>>>>>> depending > >>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> the business logic. A user would have to proactively > >>>> trawls > >>>>>>>>>>>>>> through > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> logs for WARN/ERROR messages to catch it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Throwing a hard error is pretty draconian, though… > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - Why not use config parameters instead of an > >>> interface? > >>>>> As > >>>>>>>>>>>>>>>> explained > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> the “Rejected Alternatives” section, we assume that > >>> the > >>>>>>>>>>>>> handler > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> used for a greater number of exceptions in the > >> future. > >>>>>>>>>>>>>> Defining a > >>>>>>>>>>>>>>>>>>>> configuration parameter for each exception may make > >>> the > >>>>>>>>>>>>>>>>> configuration a > >>>>>>>>>>>>>>>>>>> bit > >>>>>>>>>>>>>>>>>>>> messy. Moreover, the handler offers more > >> flexibility. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Agreed that the logic-via-configuration approach is > >>> weird > >>>>>>>> and > >>>>>>>>>>>>>>>> limiting. > >>>>>>>>>>>>>>>>>>> Forget I ever suggested it ;) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I’d think additional background in the Motivation > >>> section > >>>>>>>>> would > >>>>>>>>>>>>>>> help > >>>>>>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>>> understand how users might use this feature beyond a) > >>>>>>>> skipping > >>>>>>>>>>>>>>>>>> “oversized” > >>>>>>>>>>>>>>>>>>> records, and b) not retrying missing topics. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Small change: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> -ProductionExceptionHandlerResponse -> Response for > >>>>> brevity > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> simplicity. > >>>>>>>>>>>>>>>>>>>> Could’ve been HandlerResponse too I think! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The name change sounds good to me. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks Alieh! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I thank you all again for your useful > >>>>>>>> questions/suggestions. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I would be happy to hear more of your concerns, as > >>>> stated > >>>>>>>> in > >>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>> feedback. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>>> Alieh > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Apr 24, 2024 at 12:31 AM Justine Olshan > >>>>>>>>>>>>>>>>>>>> <jols...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks Alieh for the updates. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I'm a little concerned about the design pattern > >> here. > >>>> It > >>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>>>>>> specific usages, but we are packaging it as a > >> generic > >>>>>>>>>>>>> handler. > >>>>>>>>>>>>>>>>>>>>> I think we tried to narrow down on the specific > >>> errors > >>>> we > >>>>>>>>>>>>> want > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> handle, > >>>>>>>>>>>>>>>>>>>>> but it feels a little clunky as we have a generic > >>> thing > >>>>>>>> for > >>>>>>>>>>>>>> two > >>>>>>>>>>>>>>>>>> specific > >>>>>>>>>>>>>>>>>>>>> errors. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I'm wondering if we are using the right patterns to > >>>> solve > >>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>> problems. I > >>>>>>>>>>>>>>>>>>>>> agree though that we will need something more than > >>> the > >>>>>>>> error > >>>>>>>>>>>>>>>> classes > >>>>>>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>>>>>> proposing if we want to have different handling be > >>>>>>>>>>>>>> configurable. > >>>>>>>>>>>>>>>>>>>>> My concern is that the open-endedness of a handler > >>>> means > >>>>>>>>>>>>> that > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>> creating more problems than we are solving. It is > >>> still > >>>>>>>>>>>>>> unclear > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>> we expect to handle the errors. Perhaps we could > >>>> include > >>>>>>>> an > >>>>>>>>>>>>>>>> example? > >>>>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>> seems like there is a specific use case in mind and > >>>> maybe > >>>>>>>> we > >>>>>>>>>>>>>> can > > >