Hi Matthias, I like this compromise a lot. +1 from me
Best, Chris On Mon, Jun 17, 2024, 16:15 Justine Olshan <jols...@confluent.io.invalid> wrote: > Makes sense to me. > > Thanks Matthias for summarizing the state. > > Justine > > On Mon, Jun 17, 2024 at 1:12 PM Matthias J. Sax <mj...@apache.org> wrote: > > > Hey, > > > > seems this KIP is very difficult, and we actually had a lot of > > background discussion about it in the last weeks. I believe the problem > > with this KIP is, that we have 3 starting points to look at a problem, > > but these 3 starting points don't align: > > > > > > 1. Producer API: we want a clean API design or developers using the > > producer directly (clean semantics, not footguns...) > > > > 2. Kafka Streams: as a power user of the producer, we want to have > > advanced capabilities; given how KS works internally, we need a "power > > API" on he producer > > > > 3. Kafka Connect: also a power user of the producer. However, Connect > > is a framework not a programming API and thus prefers a config based > > approach > > > > > > I also think we got one idea wrong: let the user code / handler take > > care of retries. (I guess that's on me, I started with the idea to have > > a third return code RETRY...) -- the handler has not enough context > > information, and making this information available leads to a very > > clumsy interface. (Defeats (1) from above.) > > > > I believe, if we would move forward with the handler, we would need to > > let the producer do retries, and only call the handler after all > > retries/timeout are exhausted. However, for this to work, we need a > > producer config for Connect, what basically defeats the purpose for (2) > > to make it a programmatic solution (it seems somewhat redundant) > > > > Also, the idea to make the handler configurable, in hindsight, seems > > like a poor approach / bad compromise to address (3) w/o sacrificing > > (2), but is a problem for (1). > > > > > > We also discussed the "missing metadata" case, and actually believe we > > can address it w/o a public API change. Alieh put up a PR for this > > already: https://github.com/apache/kafka/pull/16344 > > > > > > This leaves us with the "producer error state problem for EOS" but it > > might be better to solve this differently. Alieh started KIP-1059 for > > this case now. > > > > > > Thus, it seems we should DISCARD this KIP, and the Connect team can do a > > follow up KIP to add the producer configs they need for their own > > situation. > > > > Splitting the solutions tailored for the different situations seems to > > lead to an overall cleaner solution to the problem. > > > > Thoughts? > > > > > > -Matthias > > > > > > > > On 5/15/24 12:30 AM, Federico Valeri wrote: > > > Hello Alieh, thanks for this useful KIP. > > > > > > There is a typo in the motivation when you talk about the > > > UnknownTopicOrPartitionException. It's delivery.timeout.ms, not > > > deliver.timeout.ms. > > > > > > In the past, I did some work to improve and clean the official Kafka > > > examples, which I think are useful for new Kafka users. I was > > > wondering if it is worth to improve them in order to show the correct > > > usage of this new interface. If you agree, maybe we could mention this > > > in the proposed changes. > > > > > >> The accepted responses for RecordTooLargeException are FAIL and > > SWALLOW. Therefore, RETRY will be interpreted and executed as FAIL. > > > > > > Why do we need this javadoc note? I think it's not possible to return > > > RETRY in the current form. > > > > > > When we talk about swallowing in the default implementation, I think > > > we will log an error/warning and drop the record right? If yes, should > > > we clarify this and improve the DROP_INVALID_LARGE_RECORDS_DOC by > > > mentioning the logging part? > > > > > > Should we mention somewhere which logic takes precedence when both the > > > interface and configs are used? > > > > > > On Tue, May 14, 2024 at 4:45 PM Chris Egerton <chr...@aiven.io.invalid > > > > wrote: > > >> > > >> Hi Alieh, > > >> > > >> Thank you for all the updates! One final question--how will the retry > > >> timeout for unknown topic partition errors be implemented? I think it > > would > > >> be best if this could be done with an implementation of the error > > handler, > > >> but I don't see a way to track the necessary information with the > > >> current ProducerExceptionHandler interface. > > >> > > >> Cheers, > > >> > > >> Chris > > >> > > >> On Tue, May 14, 2024 at 9:10 AM Alieh Saeedi > > <asae...@confluent.io.invalid> > > >> wrote: > > >> > > >>> Thanks Andrew. Done :) > > >>> > > >>> @Chris: I changed the config parameter type from boolean to integer, > > which > > >>> defines the timeout for retrying. I thought reusing `max.block.ms` > > was not > > >>> reasonable as you mentioned. > > >>> > > >>> So if the KIP looks good, let 's skip to the good part ;-) VOTING :) > > >>> > > >>> Bests, > > >>> Alieh > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> On Tue, May 14, 2024 at 12:26 PM Andrew Schofield < > > >>> andrew_schofi...@live.com> > > >>> wrote: > > >>> > > >>>> Hi Alieh, > > >>>> Just one final comment. > > >>>> > > >>>> [AJS5] Existing classes use Retriable, not Retryable. For example: > > >>>> > > >>>> > > >>> > > > https://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/RetriableException.html > > >>>> > > >>>> I suggest RetriableResponse and NonRetriableResponse. > > >>>> > > >>>> Thanks, > > >>>> Andrew > > >>>> > > >>>>> On 13 May 2024, at 23:17, Alieh Saeedi > <asae...@confluent.io.INVALID > > > > > >>>> wrote: > > >>>>> > > >>>>> Hi all, > > >>>>> > > >>>>> > > >>>>> Thanks for all the valid points you listed. > > >>>>> > > >>>>> > > >>>>> KIP updates and addressing concerns: > > >>>>> > > >>>>> > > >>>>> 1) The KIP now suggests two Response types: `RetryableResponse` and > > >>>>> `NonRetryableResponse` > > >>>>> > > >>>>> > > >>>>> 2) `custom.exception.handler` is changed to > > >>>> `custom.exception.handler.class` > > >>>>> > > >>>>> > > >>>>> 3) The KIP clarifies that `In the case of an implemented handler > for > > >>> the > > >>>>> specified exception, the handler takes precedence.` > > >>>>> > > >>>>> > > >>>>> 4) There is now a `default` implementation for both handle() > > methods. > > >>>>> > > >>>>> > > >>>>> 5) @Chris: for `UnknownTopicOrPartition`, the default is already > > >>>> retrying > > >>>>> for 60s. (In fact, the default value of `max.block.ms`). If the > > >>> handler > > >>>>> instructs to FAIL or SWALLOW, there will be no retry, and if the > > >>> handler > > >>>>> instructs to RETRY, that will be the default behavior, which > follows > > >>> the > > >>>>> values in already existing config parameters such as `max.block.ms > `. > > >>>> Does > > >>>>> that make sense? > > >>>>> > > >>>>> > > >>>>> Hope the changes and explanations are convincing :) > > >>>>> > > >>>>> > > >>>>> Cheers, > > >>>>> > > >>>>> Alieh > > >>>>> > > >>>>> On Mon, May 13, 2024 at 6:40 PM Justine Olshan > > >>>> <jols...@confluent.io.invalid> > > >>>>> wrote: > > >>>>> > > >>>>>> Oh I see. The type isn't the error type but a newly defined type > for > > >>> the > > >>>>>> response. Makes sense and works for me. > > >>>>>> > > >>>>>> Justine > > >>>>>> > > >>>>>> On Mon, May 13, 2024 at 9:13 AM Chris Egerton < > > >>> fearthecel...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> If we have dedicated methods for each kind of exception > > >>>>>>> (handleRecordTooLarge, handleUnknownTopicOrPartition, etc.), > > doesn't > > >>>> that > > >>>>>>> provide sufficient constraint? I'm not suggesting we eliminate > > these > > >>>>>>> methods, just that we change their return types to something more > > >>>>>> flexible. > > >>>>>>> > > >>>>>>> On Mon, May 13, 2024, 12:07 Justine Olshan > > >>>> <jols...@confluent.io.invalid > > >>>>>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> I'm not sure I agree with the Retriable and NonRetriableResponse > > >>>>>> comment. > > >>>>>>>> This doesn't limit the blast radius or enforce certain errors > are > > >>>> used. > > >>>>>>>> I think we might disagree on how controlled these interfaces can > > >>> be... > > >>>>>>>> > > >>>>>>>> Justine > > >>>>>>>> > > >>>>>>>> On Mon, May 13, 2024 at 8:40 AM Chris Egerton > > >>> <chr...@aiven.io.invalid > > >>>>>>> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Alieh, > > >>>>>>>>> > > >>>>>>>>> Thanks for the updates! I just have a few more thoughts: > > >>>>>>>>> > > >>>>>>>>> - I don't think a boolean property is sufficient to dictate > > retries > > >>>>>> for > > >>>>>>>>> unknown topic partitions, though. These errors can occur if a > > topic > > >>>>>> has > > >>>>>>>>> just been created, which can occur if, for example, automatic > > topic > > >>>>>>>>> creation is enabled for a multi-task connector. This is why I > > >>>>>> proposed > > >>>>>>> a > > >>>>>>>>> timeout instead of a boolean (and see my previous email for why > > >>>>>>> reducing > > >>>>>>>>> max.block.ms for a producer is not a viable alternative). If > it > > >>>>>> helps, > > >>>>>>>> one > > >>>>>>>>> way to reproduce this yourself is to add the line > > >>>>>>>>> `fooProps.put(TASKS_MAX_CONFIG, "10");` to the integration test > > >>> here: > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://github.com/apache/kafka/blob/5439914c32fa00d634efa7219699f1bc21add839/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java#L134 > > >>>>>>>>> and then check the logs afterward for messages like "Error > while > > >>>>>>> fetching > > >>>>>>>>> metadata with correlation id <n> : > > >>>>>>>> {foo-topic=UNKNOWN_TOPIC_OR_PARTITION}". > > >>>>>>>>> > > >>>>>>>>> - I also don't think we need custom XxxResponse enums for every > > >>>>>>> possible > > >>>>>>>>> method; it seems like this will lead to a lot of duplication > and > > >>>>>>>> cognitive > > >>>>>>>>> overhead if we want to expand the error handler in the future. > > >>>>>>> Something > > >>>>>>>>> more flexible like RetriableResponse and NonRetriableResponse > > could > > >>>>>>>>> suffice. > > >>>>>>>>> > > >>>>>>>>> - Finally, the KIP still doesn't state how the handler will or > > >>> won't > > >>>>>>> take > > >>>>>>>>> precedence over existing retry properties. If I set `retries` > or > > ` > > >>>>>>>>> delivery.timeout.ms` or `max.block.ms` to low values, will > that > > >>>>>> cause > > >>>>>>>>> retries to cease even if my custom handler would otherwise keep > > >>>>>>> returning > > >>>>>>>>> RETRY for an error? > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> > > >>>>>>>>> Chris > > >>>>>>>>> > > >>>>>>>>> On Mon, May 13, 2024 at 11:02 AM Andrew Schofield < > > >>>>>>>>> andrew_schofi...@live.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Alieh, > > >>>>>>>>>> Just a few more comments on the KIP. It is looking much less > > risky > > >>>>>>> now > > >>>>>>>>> the > > >>>>>>>>>> scope > > >>>>>>>>>> is tighter. > > >>>>>>>>>> > > >>>>>>>>>> [AJS1] It would be nice to have default implementations of the > > >>>>>> handle > > >>>>>>>>>> methods > > >>>>>>>>>> so an implementor would not need to implement both themselves. > > >>>>>>>>>> > > >>>>>>>>>> [AJS2] Producer configurations which are class names usually > end > > >>> in > > >>>>>>>>>> “.class”. > > >>>>>>>>>> I suggest “custom.exception.handler.class”. > > >>>>>>>>>> > > >>>>>>>>>> [AJS3] If I implemented a handler, and I set a non-default > value > > >>>>>> for > > >>>>>>>> one > > >>>>>>>>>> of the > > >>>>>>>>>> new configuations, what happens? I would expect that the > handler > > >>>>>>> takes > > >>>>>>>>>> precedence. I wasn’t quite clear what “the control will follow > > the > > >>>>>>>>> handler > > >>>>>>>>>> instructions” meant. > > >>>>>>>>>> > > >>>>>>>>>> [AJS4] Because you now have an enum for the > > >>>>>>>>>> RecordTooLargeExceptionResponse, > > >>>>>>>>>> I don’t think you need to state in the comment for > > >>>>>>>>>> ProducerExceptionHandler that > > >>>>>>>>>> RETRY will be interpreted as FAIL. > > >>>>>>>>>> > > >>>>>>>>>> Thanks, > > >>>>>>>>>> Andrew > > >>>>>>>>>> > > >>>>>>>>>>> On 13 May 2024, at 14:53, Alieh Saeedi > > >>>>>>> <asae...@confluent.io.INVALID > > >>>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Hi all, > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks for the very interesting discussion during my PTO. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> KIP updates and addressing concerns: > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 1) Two handle() methods are defined in > ProducerExceptionHandler > > >>>>>> for > > >>>>>>>> the > > >>>>>>>>>> two > > >>>>>>>>>>> exceptions with different input parameters so that we have > > >>>>>>>>>>> handle(RecordTooLargeException e, ProducerRecord record) and > > >>>>>>>>>>> handle(UnknownTopicOrPartitionException e, ProducerRecord > > record) > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 2) The ProducerExceptionHandler extends `Closable` as well. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> 3) The KIP suggests having two more configuration parameters > > with > > >>>>>>>>> boolean > > >>>>>>>>>>> values: > > >>>>>>>>>>> > > >>>>>>>>>>> - `drop.invalid.large.records` with a default value of > `false` > > >>>>>> for > > >>>>>>>>>>> swallowing too large records. > > >>>>>>>>>>> > > >>>>>>>>>>> - `retry.unknown.topic.partition` with a default value of > > `true` > > >>>>>>> that > > >>>>>>>>>>> performs RETRY for `max.block.ms` ms, encountering the > > >>>>>>>>>>> UnknownTopicOrPartitionException. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Hope the main concerns are addressed so that we can go > forward > > >>>>>> with > > >>>>>>>>>> voting. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> > > >>>>>>>>>>> Alieh > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, May 9, 2024 at 11:25 PM Artem Livshits > > >>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Mathias, > > >>>>>>>>>>>> > > >>>>>>>>>>>>> [AL1] While I see the point, I would think having a > different > > >>>>>>>>> callback > > >>>>>>>>>>>> for every exception might not really be elegant? > > >>>>>>>>>>>> > > >>>>>>>>>>>> I'm not sure how to assess the level of elegance of the > > >>>>>> proposal, > > >>>>>>>> but > > >>>>>>>>> I > > >>>>>>>>>> can > > >>>>>>>>>>>> comment on the technical characteristics: > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1. Having specific interfaces that codify the logic that is > > >>>>>>>> currently > > >>>>>>>>>>>> prescribed in the comments reduce the chance of making a > > >>>>>> mistake. > > >>>>>>>>>>>> Commments may get ignored, misuderstood or etc. but if the > > >>>>>>> contract > > >>>>>>>> is > > >>>>>>>>>>>> codified, the compilier will help to enforce the contract. > > >>>>>>>>>>>> 2. Given that the logic is trickier than it seems (the > > >>>>>>>>> record-too-large > > >>>>>>>>>> is > > >>>>>>>>>>>> an example that can easily confuse someone who's not > > intimately > > >>>>>>>>> familiar > > >>>>>>>>>>>> with the nuances of the batching logic), having a little > more > > >>>>>>> hoops > > >>>>>>>> to > > >>>>>>>>>> jump > > >>>>>>>>>>>> would give a greater chance that whoever tries to add a new > > >>>>>> cases > > >>>>>>>>> pauses > > >>>>>>>>>>>> and thinks a bit more. > > >>>>>>>>>>>> 3. As Justine pointed out, having different method will be a > > >>>>>>> forcing > > >>>>>>>>>>>> function to go through a KIP rather than smuggle new cases > > >>>>>> through > > >>>>>>>>>>>> implementation. > > >>>>>>>>>>>> 4. Sort of a consequence of the previous 3 -- all those > things > > >>>>>>>> reduce > > >>>>>>>>>> the > > >>>>>>>>>>>> chance of someone writing the code that works with 2 errors > > and > > >>>>>>> then > > >>>>>>>>>> when > > >>>>>>>>>>>> more errors are added in the future will suddenly > incorrectly > > >>>>>>> ignore > > >>>>>>>>> new > > >>>>>>>>>>>> errors (the example I gave in the previous email). > > >>>>>>>>>>>> > > >>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some > > extend > > >>>>>>> as > > >>>>>>>>>>>> business logic. If a user puts a bad filter condition in > their > > >>>>>> KS > > >>>>>>>> app, > > >>>>>>>>>> and > > >>>>>>>>>>>> drops messages > > >>>>>>>>>>>> > > >>>>>>>>>>>> I agree that there is always a chance to get a bug and lose > > >>>>>>>> messages, > > >>>>>>>>>> but > > >>>>>>>>>>>> there are generally separation of concerns that has > different > > >>>>>> risk > > >>>>>>>>>> profile: > > >>>>>>>>>>>> the filtering logic may be more rigorously tested and rarely > > >>>>>>> changed > > >>>>>>>>>> (say > > >>>>>>>>>>>> an application developer does it), but setting the topics to > > >>>>>>> produce > > >>>>>>>>>> may be > > >>>>>>>>>>>> done via configuration (e.g. a user of the application does > > it) > > >>>>>>> and > > >>>>>>>>> it's > > >>>>>>>>>>>> generally an expectation that users would get an error when > > >>>>>>>>>> configuration > > >>>>>>>>>>>> is incorrect. > > >>>>>>>>>>>> > > >>>>>>>>>>>> What could be worse is that UnknownTopicOrPartitionException > > can > > >>>>>>> be > > >>>>>>>> an > > >>>>>>>>>>>> intermittent error, i.e. with a generally correct > > configuration, > > >>>>>>>> there > > >>>>>>>>>>>> could be metadata propagation problem on the cluster and > then > > a > > >>>>>>>> random > > >>>>>>>>>> set > > >>>>>>>>>>>> of records could get lost. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to me, > > >>>>>>>> checking > > >>>>>>>>>> the > > >>>>>>>>>>>> size of the record upfront is exactly what the KIP proposes? > > No? > > >>>>>>>>>>>> > > >>>>>>>>>>>> It achieves the same result but solves it differently, my > > >>>>>>> proposal: > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1. Application checks the validity of a record (maybe via a > > new > > >>>>>>>>>>>> validateRecord method) before producing it, and can just > > exclude > > >>>>>>> it > > >>>>>>>> or > > >>>>>>>>>>>> return an error to the user. > > >>>>>>>>>>>> 2. Application produces the record -- at this point there > are > > no > > >>>>>>>>> records > > >>>>>>>>>>>> that could return record too large, they were either skipped > > at > > >>>>>>>> step 1 > > >>>>>>>>>> or > > >>>>>>>>>>>> we didn't get here because step 1 failed. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Vs. KIP's proposal > > >>>>>>>>>>>> > > >>>>>>>>>>>> 1. Application produces the record. > > >>>>>>>>>>>> 2. Application gets a callback. > > >>>>>>>>>>>> 3. Application returns the action on how to proceed. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The advantage of the former is the clarity of semantics -- > the > > >>>>>>>> record > > >>>>>>>>> is > > >>>>>>>>>>>> invalid (property of the record, not a function of server > > state > > >>>>>> or > > >>>>>>>>>> server > > >>>>>>>>>>>> configuration) and we can clearly know that it is the record > > >>>>>> that > > >>>>>>> is > > >>>>>>>>> bad > > >>>>>>>>>>>> and can never succeed. > > >>>>>>>>>>>> > > >>>>>>>>>>>> The KIP-proposed way actually has a very tricky point: it > > >>>>>> actually > > >>>>>>>>>> handles > > >>>>>>>>>>>> a subset of record-too-large exceptions. The broker can > > return > > >>>>>>>>>>>> record-too-large and reject the whole batch (but we don't > want > > >>>>>> to > > >>>>>>>>> ignore > > >>>>>>>>>>>> those because then we can skip random records that just > > happened > > >>>>>>> to > > >>>>>>>> be > > >>>>>>>>>> in > > >>>>>>>>>>>> the same batch), in some sense we use the same error for 2 > > >>>>>>> different > > >>>>>>>>>>>> conditions and understanding that requires pretty deep > > >>>>>>> understanding > > >>>>>>>>> of > > >>>>>>>>>>>> Kafka internals. > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Artem > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Wed, May 8, 2024 at 9:47 AM Justine Olshan > > >>>>>>>>>> <jols...@confluent.io.invalid > > >>>>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> My concern with respect to it being fragile: the code that > > >>>>>>> ensures > > >>>>>>>>> the > > >>>>>>>>>>>>> error type is internal to the producer. Someone may see it > > and > > >>>>>>>> say, I > > >>>>>>>>>>>> want > > >>>>>>>>>>>>> to add such and such error. This looks like internal code, > > so I > > >>>>>>>> don't > > >>>>>>>>>>>> need > > >>>>>>>>>>>>> a KIP, and then they can change it to whatever they want > > >>>>>> thinking > > >>>>>>>> it > > >>>>>>>>> is > > >>>>>>>>>>>>> within the typical kafka improvement protocol. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Relying on an internal change to enforce an external API is > > >>>>>>> fragile > > >>>>>>>>> in > > >>>>>>>>>> my > > >>>>>>>>>>>>> opinion. That's why I sort of agreed with Artem with > > enforcing > > >>>>>>> the > > >>>>>>>>>> error > > >>>>>>>>>>>> in > > >>>>>>>>>>>>> the method signature -- part of the public API. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Chris's comments on requiring more information to handler > > again > > >>>>>>>> makes > > >>>>>>>>>> me > > >>>>>>>>>>>>> wonder if we are solving a problem of lack of information > at > > >>>>>> the > > >>>>>>>>>>>>> application level with a more powerful solution than we > need. > > >>>>>>> (Ie, > > >>>>>>>> if > > >>>>>>>>>> we > > >>>>>>>>>>>>> had more information, could the application close and > restart > > >>>>>> the > > >>>>>>>>>>>>> transaction rather than having to drop records) But I am > > happy > > >>>>>> to > > >>>>>>>>>>>>> compromise with a handler that we can agree is sufficiently > > >>>>>>>>> controlled > > >>>>>>>>>>>> and > > >>>>>>>>>>>>> documented. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Justine > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Wed, May 8, 2024 at 7:20 AM Chris Egerton > > >>>>>>>> <chr...@aiven.io.invalid > > >>>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Continuing prior discussions: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 1) Regarding the "flexibility" discussion, my overarching > > >>>>>> point > > >>>>>>> is > > >>>>>>>>>>>> that I > > >>>>>>>>>>>>>> don't see the point in allowing for this kind of pluggable > > >>>>>> logic > > >>>>>>>>>>>> without > > >>>>>>>>>>>>>> also covering more scenarios. Take example 2 in the KIP: > if > > >>>>>>> we're > > >>>>>>>>>> going > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>> implement retries only on "important" topics when a topic > > >>>>>>>> partition > > >>>>>>>>>>>> isn't > > >>>>>>>>>>>>>> found, why wouldn't we also want to be able to do this for > > >>>>>> other > > >>>>>>>>>>>> errors? > > >>>>>>>>>>>>>> Again, taking authorization errors as an example, why > > wouldn't > > >>>>>>> we > > >>>>>>>>> want > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> be able to fail when we can't write to "important" topics > > >>>>>>> because > > >>>>>>>>> the > > >>>>>>>>>>>>>> producer principal lacks sufficient ACLs, and drop the > > record > > >>>>>> if > > >>>>>>>> the > > >>>>>>>>>>>>> topic > > >>>>>>>>>>>>>> isn't "important"? In a security-conscious environment > with > > >>>>>>>>>>>>>> runtime-dependent topic routing (which is a common feature > > of > > >>>>>>> many > > >>>>>>>>>>>> source > > >>>>>>>>>>>>>> connectors, such as the Debezium connectors), this seems > > >>>>>> fairly > > >>>>>>>>>> likely. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 2) As far as changing the shape of the API goes, I like > > >>>>>> Artem's > > >>>>>>>> idea > > >>>>>>>>>> of > > >>>>>>>>>>>>>> splitting out the interface based on specific exceptions. > > This > > >>>>>>> may > > >>>>>>>>> be > > >>>>>>>>>> a > > >>>>>>>>>>>>>> little laborious to expand in the future, but if we really > > >>>>>> want > > >>>>>>> to > > >>>>>>>>>>>>>> limit the exceptions that we cover with the handler and > move > > >>>>>>>> slowly > > >>>>>>>>>> and > > >>>>>>>>>>>>>> cautiously, then IMO it'd be reasonable to reflect that in > > the > > >>>>>>>>>>>>> interface. I > > >>>>>>>>>>>>>> also acknowledge that there's no way to completely prevent > > >>>>>>> people > > >>>>>>>>> from > > >>>>>>>>>>>>>> shooting themselves in the foot by implementing the API > > >>>>>>>> incorrectly, > > >>>>>>>>>>>> but > > >>>>>>>>>>>>> I > > >>>>>>>>>>>>>> think it's worth it to do what we can--including > leveraging > > >>>>>> the > > >>>>>>>> Java > > >>>>>>>>>>>>>> language's type system--to help them, so IMO there's value > > to > > >>>>>>>>>>>> eliminating > > >>>>>>>>>>>>>> the implicit behavior of failing when a policy returns > RETRY > > >>>>>>> for a > > >>>>>>>>>>>>>> non-retriable error. This can take a variety of shapes and > > I'm > > >>>>>>> not > > >>>>>>>>>>>> going > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>> insist on anything specific, but I do want to again raise > my > > >>>>>>>>> concerns > > >>>>>>>>>>>>> with > > >>>>>>>>>>>>>> the current proposal and request that we find something a > > >>>>>> little > > >>>>>>>>>>>> better. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 3) Concerning the default implementation--actually, I > meant > > >>>>>>> what I > > >>>>>>>>>>>> wrote > > >>>>>>>>>>>>> :) > > >>>>>>>>>>>>>> I don't want a "second" default, I want an implementation > of > > >>>>>>> this > > >>>>>>>>>>>>> interface > > >>>>>>>>>>>>>> to be used as the default if no others are specified. The > > >>>>>>> behavior > > >>>>>>>>> of > > >>>>>>>>>>>>> this > > >>>>>>>>>>>>>> default implementation would be identical to existing > > behavior > > >>>>>>> (so > > >>>>>>>>>>>> there > > >>>>>>>>>>>>>> would be no backwards compatibility concerns like the ones > > >>>>>>> raised > > >>>>>>>> by > > >>>>>>>>>>>>>> Matthias), but it would be possible to configure this > > default > > >>>>>>>>> handler > > >>>>>>>>>>>>> class > > >>>>>>>>>>>>>> to behave differently for a basic set of scenarios. This > > would > > >>>>>>>>> mirror > > >>>>>>>>>>>>> (pun > > >>>>>>>>>>>>>> intended) the approach we've taken with Mirror Maker 2 and > > its > > >>>>>>>>>>>>>> ReplicationPolicy interface [1]. There is a default > > >>>>>>> implementation > > >>>>>>>>>>>>>> available [2] that recognizes a handful of basic > > configuration > > >>>>>>>>>>>> properties > > >>>>>>>>>>>>>> [3] for simple tweaks, but if users want, they can also > > >>>>>>> implement > > >>>>>>>>>> their > > >>>>>>>>>>>>> own > > >>>>>>>>>>>>>> replication policy for more fine-grained logic if those > > >>>>>>> properties > > >>>>>>>>>>>> aren't > > >>>>>>>>>>>>>> flexible enough. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> More concretely, I'm imagining something like this for the > > >>>>>>>> producer > > >>>>>>>>>>>>>> exception handler: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> - Default implementation class > > >>>>>>>>>>>>>> of > > >>>>>>>> > org.apache.kafka.clients.producer.DefaultProducerExceptionHandler > > >>>>>>>>>>>>>> - This class would recognize two properties: > > >>>>>>>>>>>>>> - drop.invalid.large.records: Boolean property, defaults > to > > >>>>>>>> false. > > >>>>>>>>> If > > >>>>>>>>>>>>>> "false", then causes the handler to return FAIL whenever > > >>>>>>>>>>>>>> a RecordTooLargeException is encountered; if "true", then > > >>>>>> causes > > >>>>>>>>>>>>>> SWALLOW/SKIP/DROP to be returned instead > > >>>>>>>>>>>>>> - unknown.topic.partition.retry.timeout.ms: Integer > > >>>>>> property, > > >>>>>>>>>>>> defaults > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>> INT_MAX. Whenever an UnknownTopicOrPartitionException is > > >>>>>>>>> encountered, > > >>>>>>>>>>>>>> causes the handler to return FAIL if that record has been > > >>>>>>> pending > > >>>>>>>>> for > > >>>>>>>>>>>>> more > > >>>>>>>>>>>>>> than the retry timeout; otherwise, causes RETRY to be > > returned > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I think this is worth addressing now instead of later > > because > > >>>>>> it > > >>>>>>>>>> forces > > >>>>>>>>>>>>> us > > >>>>>>>>>>>>>> to evaluate the usefulness of this interface and it > > addresses > > >>>>>> a > > >>>>>>>>>>>>>> long-standing issue not just with Kafka Connect, but with > > the > > >>>>>>> Java > > >>>>>>>>>>>>> producer > > >>>>>>>>>>>>>> in general. For reference, here are a few tickets I > > collected > > >>>>>>>> after > > >>>>>>>>>>>>> briefly > > >>>>>>>>>>>>>> skimming our Jira showing that this is a real pain point > for > > >>>>>>>> users: > > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10340, > > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-12990, > > >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-13634. > Although > > >>>>>>> this > > >>>>>>>> is > > >>>>>>>>>>>>>> frequently reported with Kafka Connect, it applies to > anyone > > >>>>>> who > > >>>>>>>>>>>>> configures > > >>>>>>>>>>>>>> a producer to use a high retry timeout. I am aware of the > > >>>>>>>>>> max.block.ms > > >>>>>>>>>>>>>> property, but it's painful and IMO poor behavior to > require > > >>>>>>> users > > >>>>>>>> to > > >>>>>>>>>>>>> reduce > > >>>>>>>>>>>>>> the value of this property just to handle the single > > scenario > > >>>>>>> when > > >>>>>>>>>>>> trying > > >>>>>>>>>>>>>> to write to topics that don't exist, since it would also > > limit > > >>>>>>> the > > >>>>>>>>>>>> retry > > >>>>>>>>>>>>>> timeout for other operations that are legitimately > > retriable. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Raising new points: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 5) I don't see the interplay between this handler and > > existing > > >>>>>>>>>>>>>> retry-related properties mentioned anywhere in the KIP. > I'm > > >>>>>>>> assuming > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>> properties like "retries", "max.block.ms", and " > > >>>>>>>> delivery.timeout.ms > > >>>>>>>>> " > > >>>>>>>>>>>>> would > > >>>>>>>>>>>>>> take precedence over the handler and once they are > > exhausted, > > >>>>>>> the > > >>>>>>>>>>>>>> record/batch will fail no matter what? If so, it's > probably > > >>>>>>> worth > > >>>>>>>>>>>> briefly > > >>>>>>>>>>>>>> mentioning this (no more than a sentence or two) in the > KIP, > > >>>>>> and > > >>>>>>>> if > > >>>>>>>>>>>> not, > > >>>>>>>>>>>>>> I'm curious what you have in mind. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 6) I also wonder if the API provides enough information in > > its > > >>>>>>>>> current > > >>>>>>>>>>>>>> form. Would it be possible to provide handlers with some > way > > >>>>>> of > > >>>>>>>>>>>> tracking > > >>>>>>>>>>>>>> how long a record has been pending for (i.e., how long > it's > > >>>>>> been > > >>>>>>>>> since > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>> record was provided as an argument to Producer::send)? One > > way > > >>>>>>> to > > >>>>>>>> do > > >>>>>>>>>>>> this > > >>>>>>>>>>>>>> could be to add a method like > `onNewRecord(ProducerRecord)` > > >>>>>> and > > >>>>>>>>>>>>>> allow/require the handler to do its own bookkeeping, > > probably > > >>>>>>>> with a > > >>>>>>>>>>>>>> matching `onRecordSuccess(ProducerRecord)` method so that > > the > > >>>>>>>>> handler > > >>>>>>>>>>>>>> doesn't eat up an ever-increasing amount of memory trying > to > > >>>>>>> track > > >>>>>>>>>>>>> records. > > >>>>>>>>>>>>>> An alternative could be to include information about the > > >>>>>> initial > > >>>>>>>>> time > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>> record was received by the producer and the number of > > retries > > >>>>>>> that > > >>>>>>>>>> have > > >>>>>>>>>>>>>> been performed for it as parameters in the handle > method(s), > > >>>>>> but > > >>>>>>>> I'm > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>> sure how easy this would be to implement and it might > > clutter > > >>>>>>>> things > > >>>>>>>>>>>> up a > > >>>>>>>>>>>>>> bit too much. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> 7) A small request--can we add Closeable (or, if you > prefer, > > >>>>>>>>>>>>> AutoCloseable) > > >>>>>>>>>>>>>> as a superinterface for the handler interface? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> [1] - > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/ReplicationPolicy.html > > >>>>>>>>>>>>>> [2] - > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html > > >>>>>>>>>>>>>> [3] - > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#SEPARATOR_CONFIG > > >>>>>>>>>>>>>> , > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.html#INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Chris > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Wed, May 8, 2024 at 12:37 AM Matthias J. Sax < > > >>>>>>> mj...@apache.org > > >>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Very interesting discussion. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Seems a central point is the question about "how generic" > > we > > >>>>>>>>> approach > > >>>>>>>>>>>>>>> this, and some people think we need to be conservative > and > > >>>>>>> others > > >>>>>>>>>>>> think > > >>>>>>>>>>>>>>> we should try to be as generic as possible. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Personally, I think following a limited scope for this > KIP > > >>>>>> (by > > >>>>>>>>>>>>>>> explicitly saying we only cover RecordTooLarge and > > >>>>>>>>>>>>>>> UnknownTopicOrPartition) might be better. We have > concrete > > >>>>>>>> tickets > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>> we address, while for other exception (like > authorization) > > we > > >>>>>>>> don't > > >>>>>>>>>>>>> know > > >>>>>>>>>>>>>>> if people want to handle it to begin with. Boiling the > > ocean > > >>>>>>>> might > > >>>>>>>>>>>> not > > >>>>>>>>>>>>>>> get us too far, and being somewhat pragmatic might help > to > > >>>>>> move > > >>>>>>>>> this > > >>>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>> forward. -- I also agree with Justin and Artem, that we > > want > > >>>>>> to > > >>>>>>>> be > > >>>>>>>>>>>>>>> careful anyway to not allow users to break stuff too > > easily. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> As the same time, I agree that we should setup this > change > > >>>>>> in a > > >>>>>>>>>>>> forward > > >>>>>>>>>>>>>>> looking way, and thus having a single generic handler > > allows > > >>>>>> us > > >>>>>>>> to > > >>>>>>>>>>>>> later > > >>>>>>>>>>>>>>> extend the handler more easily. This should also simplify > > >>>>>>> follow > > >>>>>>>> up > > >>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>> that might add new error cases (I actually mentioned one > > more > > >>>>>>> to > > >>>>>>>>>>>> Alieh > > >>>>>>>>>>>>>>> already, but we both agreed that it might be best to > > exclude > > >>>>>> it > > >>>>>>>>> from > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> KIP right now, to make the 3.8 deadline. Doing a follow > up > > >>>>>> KIP > > >>>>>>> is > > >>>>>>>>> not > > >>>>>>>>>>>>>>> the end of the world.) > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> @Chris: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> (2) This sounds fair to me, but not sure how "bad" it > > >>>>>> actually > > >>>>>>>>> would > > >>>>>>>>>>>>> be? > > >>>>>>>>>>>>>>> If the contract is clearly defined, it seems to be fine > > what > > >>>>>>> the > > >>>>>>>>> KIP > > >>>>>>>>>>>>>>> proposes, and given that such a handler is an expert API, > > and > > >>>>>>> we > > >>>>>>>>> can > > >>>>>>>>>>>>>>> provide "best practices" (cf my other comment below in > > >>>>>> [AL1]), > > >>>>>>>>> being > > >>>>>>>>>>>> a > > >>>>>>>>>>>>>>> little bit pragmatic sound fine to me. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Not sure if I understand Justin's argument on this > > question? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> (3) About having a default impl or not. I am fine with > > adding > > >>>>>>>> one, > > >>>>>>>>>>>> even > > >>>>>>>>>>>>>>> if I am not sure why Connect could not just add its own > one > > >>>>>> and > > >>>>>>>>> plug > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>>> in (and we would add corresponding configs for Connect, > but > > >>>>>> not > > >>>>>>>> for > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> producer itself)? For this case, we could also do this > as a > > >>>>>>>> follow > > >>>>>>>>> up > > >>>>>>>>>>>>>>> KIP, but happy to include it in this KIP to provide value > > to > > >>>>>>>>> Connect > > >>>>>>>>>>>>>>> right away (even if the value might not come right away > if > > we > > >>>>>>>> miss > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> 3.8 deadline due to expanded KIP scope...) -- For KS, we > > >>>>>> would > > >>>>>>>> for > > >>>>>>>>>>>>> sure > > >>>>>>>>>>>>>>> plugin our own impl, and lock down the config such that > > users > > >>>>>>>>> cannot > > >>>>>>>>>>>>> set > > >>>>>>>>>>>>>>> their own handler on the internal producer to begin with. > > >>>>>> Might > > >>>>>>>> be > > >>>>>>>>>>>> good > > >>>>>>>>>>>>>>> to elaborate why the producer should have a default? We > > might > > >>>>>>>>>>>> actually > > >>>>>>>>>>>>>>> want to add this to the KIP right away? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The key for a default impl would be, to not change the > > >>>>>> current > > >>>>>>>>>>>>> behavior, > > >>>>>>>>>>>>>>> and having no default seems to achieve this. For the two > > >>>>>> cases > > >>>>>>>> you > > >>>>>>>>>>>>>>> mentioned, it's unclear to me what default value on > "upper > > >>>>>>> bound > > >>>>>>>> on > > >>>>>>>>>>>>>>> retires" for UnkownTopicOrPartitionException we should > set? > > >>>>>>> Seems > > >>>>>>>>> it > > >>>>>>>>>>>>>>> would need to be the same as `delivery.timeout.ms`? > > However, > > >>>>>>> if > > >>>>>>>>>>>> users > > >>>>>>>>>>>>>>> have `delivery.timeout.ms` actually overwritten we would > > >>>>>> need > > >>>>>>> to > > >>>>>>>>> set > > >>>>>>>>>>>>>>> this config somewhat "dynamic"? Is this feasible? If we > > >>>>>>>> hard-code 2 > > >>>>>>>>>>>>>>> minutes, it might not be backward compatible. I have the > > >>>>>>>> impression > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>> might introduce some undesired coupling? -- For the > "record > > >>>>>> too > > >>>>>>>>>>>> large" > > >>>>>>>>>>>>>>> case, the config seems to be boolean and setting it to > > >>>>>> `false` > > >>>>>>> by > > >>>>>>>>>>>>>>> default seems to provide backward compatibility. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> @Artem: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [AL1] While I see the point, I would think having a > > different > > >>>>>>>>>>>> callback > > >>>>>>>>>>>>>>> for every exception might not really be elegant? In the > > end, > > >>>>>>> the > > >>>>>>>>>>>>> handler > > >>>>>>>>>>>>>>> is an very advanced feature anyway, and if it's > implemented > > >>>>>> in > > >>>>>>> a > > >>>>>>>>> bad > > >>>>>>>>>>>>>>> way, well, it's a user error -- we cannot protect users > > from > > >>>>>>>>>>>>> everything. > > >>>>>>>>>>>>>>> To me, a handler like this, is to some extend "business > > >>>>>> logic" > > >>>>>>>> and > > >>>>>>>>>>>> if a > > >>>>>>>>>>>>>>> user gets business logic wrong, it's hard to protect > them. > > -- > > >>>>>>> We > > >>>>>>>>>>>> would > > >>>>>>>>>>>>>>> of course provide best practice guidance in the JaveDocs, > > and > > >>>>>>>>> explain > > >>>>>>>>>>>>>>> that a handler should have explicit `if` statements for > > stuff > > >>>>>>> it > > >>>>>>>>> want > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>> handle, and only a single default which return FAIL. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [AL2] Yes, but for KS we would retry at the application > > >>>>>> layer. > > >>>>>>>> Ie, > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> TX is not completed, we clean up and setup out task from > > >>>>>>> scratch, > > >>>>>>>>> to > > >>>>>>>>>>>>>>> ensure the pending transaction is completed before we > > resume. > > >>>>>>> If > > >>>>>>>>> the > > >>>>>>>>>>>> TX > > >>>>>>>>>>>>>>> was indeed aborted, we would retry from older offset and > > thus > > >>>>>>>> just > > >>>>>>>>>>>> hit > > >>>>>>>>>>>>>>> the same error again and the loop begins again. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [AL2 cont.] Similar to AL1, I see such a handler to some > > >>>>>> extend > > >>>>>>>> as > > >>>>>>>>>>>>>>> business logic. If a user puts a bad filter condition in > > >>>>>> their > > >>>>>>> KS > > >>>>>>>>>>>> app, > > >>>>>>>>>>>>>>> and drops messages, it nothing we can do about it, and > this > > >>>>>>>> handler > > >>>>>>>>>>>>>>> IMHO, has a similar purpose. This is also the line of > > >>>>>> thinking > > >>>>>>> I > > >>>>>>>>>>>> apply > > >>>>>>>>>>>>>>> to EOS, to address Justin's concern about "should we > allow > > to > > >>>>>>>> drop > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>> EOS", and my answer is "yes", because it's more business > > >>>>>> logic > > >>>>>>>> than > > >>>>>>>>>>>>>>> actual error handling IMHO. And by default, we fail... So > > >>>>>> users > > >>>>>>>>>>>> opt-in > > >>>>>>>>>>>>>>> to add business logic to drop records. It's an > application > > >>>>>>> level > > >>>>>>>>>>>>>>> decision how to write the code. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [AL3] Maybe I misunderstand what you are saying, but to > me, > > >>>>>>>>> checking > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> size of the record upfront is exactly what the KIP > > proposes? > > >>>>>>> No? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> @Justin: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I saw the sample > > >>>>>>>>>>>>>>>> code -- is it just an if statement checking for the > error > > >>>>>>> before > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What do you mean by fragile? Not sure if I see your > point. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 5/7/24 5:33 PM, Artem Livshits wrote: > > >>>>>>>>>>>>>>>> Hi Alieh, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the KIP. The motivation talks about very > > >>>>>> specific > > >>>>>>>>>>>> cases, > > >>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>> the interface is generic. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [AL1] > > >>>>>>>>>>>>>>>> If the interface evolves in the future I think we could > > have > > >>>>>>> the > > >>>>>>>>>>>>>>> following > > >>>>>>>>>>>>>>>> confusion: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> 1. A user implemented SWALLOW action for both > > >>>>>>>>>>>> RecordTooLargeException > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> UnknownTopicOrPartitionException. For simpicity they > just > > >>>>>>>> return > > >>>>>>>>>>>>>> SWALLOW > > >>>>>>>>>>>>>>>> from the function, because it elegantly handles all > known > > >>>>>>> cases. > > >>>>>>>>>>>>>>>> 2. The interface has evolved to support a new exception. > > >>>>>>>>>>>>>>>> 3. The user has upgraded their Kafka client. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Now a new kind of error gets dropped on the floor > without > > >>>>>>> user's > > >>>>>>>>>>>>>>> intention > > >>>>>>>>>>>>>>>> and it would be super hard to detect and debug. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> To avoid the confusion, I think we should use handlers > for > > >>>>>>>>> specific > > >>>>>>>>>>>>>>>> exceptions. Then if a new exception is added it won't > get > > >>>>>>>>> silently > > >>>>>>>>>>>>>>> swalled > > >>>>>>>>>>>>>>>> because the user would need to add new functionality to > > >>>>>> handle > > >>>>>>>> it. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I also have some higher level comments: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [AL2] > > >>>>>>>>>>>>>>>>> it throws a TimeoutException, and the user can only > > blindly > > >>>>>>>>> retry, > > >>>>>>>>>>>>>> which > > >>>>>>>>>>>>>>>> may result in an infinite retry loop > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> If the TimeoutException happens during transactional > > >>>>>>> processing > > >>>>>>>>>>>>>> (exactly > > >>>>>>>>>>>>>>>> once is the desired sematnics), then the client should > not > > >>>>>>> retry > > >>>>>>>>>>>> when > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>> gets TimeoutException because without knowing the reason > > for > > >>>>>>>>>>>>>>>> TimeoutExceptions, the client cannot know whether the > > >>>>>> message > > >>>>>>>> got > > >>>>>>>>>>>>>>> actually > > >>>>>>>>>>>>>>>> produced or not and retrying the message may result in > > >>>>>>>>> duplicatees. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> The thrown TimeoutException "cuts" the connection to > the > > >>>>>>>>>>>> underlying > > >>>>>>>>>>>>>> root > > >>>>>>>>>>>>>>>> cause of missing metadata > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Maybe we should fix the error handling and return the > > proper > > >>>>>>>>>>>>> underlying > > >>>>>>>>>>>>>>>> message? Then the application can properly handle the > > >>>>>> message > > >>>>>>>>>>>> based > > >>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>> preferences. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> From the product perspective, it's not clear how safe > it > > is > > >>>>>> to > > >>>>>>>>>>>>> blindly > > >>>>>>>>>>>>>>>> ignore UnknownTopicOrPartitionException. This could > lead > > to > > >>>>>>>>>>>>> situations > > >>>>>>>>>>>>>>>> when a simple typo could lead to massive data loss (part > > of > > >>>>>>> the > > >>>>>>>>>>>> data > > >>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>> effectively be produced to a "black hole" and the user > may > > >>>>>> not > > >>>>>>>>>>>> notice > > >>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>> for a while). > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> In which situations would you recommend the user to > "black > > >>>>>>> hole" > > >>>>>>>>>>>>>> messages > > >>>>>>>>>>>>>>>> in case of misconfiguration? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [AL3] > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> If the custom handler decides on SWALLOW for > > >>>>>>>>>>>>> RecordTooLargeException, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Is it my understanding that this KIP proposes that > > >>>>>>> functionality > > >>>>>>>>>>>> that > > >>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>> only be able to SWALLOW RecordTooLargeException that > > happen > > >>>>>>>>> because > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> producer cannot produce the record (if the broker > rejects > > >>>>>> the > > >>>>>>>>>>>> batch, > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> error won't get to the handler, because we cannot know > > which > > >>>>>>>> other > > >>>>>>>>>>>>>>> records > > >>>>>>>>>>>>>>>> get ignored). In this case, why not just check the > > locally > > >>>>>>>>>>>>> configured > > >>>>>>>>>>>>>>> max > > >>>>>>>>>>>>>>>> record size upfront and not produce the recrord in the > > first > > >>>>>>>>> place? > > >>>>>>>>>>>>>>> Maybe > > >>>>>>>>>>>>>>>> we can expose a validation function from the producer > that > > >>>>>>> could > > >>>>>>>>>>>>>> validate > > >>>>>>>>>>>>>>>> the records locally, so we don't need to produce the > > record > > >>>>>> in > > >>>>>>>>>>>> order > > >>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> know that it's invalid. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> -Artem > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 2:07 PM Justine Olshan > > >>>>>>>>>>>>>>> <jols...@confluent.io.invalid> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Alieh and Chris, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for clarifying 1) but I saw the motivation. I > > guess > > >>>>>> I > > >>>>>>>> just > > >>>>>>>>>>>>>> didn't > > >>>>>>>>>>>>>>>>> understand how that would be ensured on the producer > > side. > > >>>>>> I > > >>>>>>>> saw > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> sample > > >>>>>>>>>>>>>>>>> code -- is it just an if statement checking for the > error > > >>>>>>>> before > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> handler is invoked? That seems a bit fragile. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Can you clarify what you mean by `since the code does > not > > >>>>>>> reach > > >>>>>>>>>>>> the > > >>>>>>>>>>>>> KS > > >>>>>>>>>>>>>>>>> interface and breaks somewhere in producer.` If we > > surfaced > > >>>>>>>> this > > >>>>>>>>>>>>> error > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> the application in a better way would that also be a > > >>>>>> solution > > >>>>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>> issue? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Justine > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Tue, May 7, 2024 at 1:55 PM Alieh Saeedi > > >>>>>>>>>>>>>>> <asae...@confluent.io.invalid> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>