Hi all, Appreciation for maintaining the momentum of our discussion.
I see kinda consensus over the main points. It seems that we agreed on the following: 1) Define the `commitTnx(commitOptions)` to clear the error. 2) Make the user explicitly call `flush()` before `commitTnx(commitOptions)`, if he determines ignoring errors. I updated the KIP with the above-mentioned points. Please take a look. I am sure it is not perfect yet, and there are/will be some open questions, but if you agree, I will open voting as well. Of course, the discussion can still carry on in this thread. Cheers, Alieh On Tue, Jun 25, 2024 at 11:36 AM Chris Egerton <fearthecel...@gmail.com> wrote: > Hi Artem, > > Yes, I completely agree that by default, special action shouldn't be > required from users to prevent transactions from being committed when one > or more records can't be sent. The behavior I was suggesting was only > relevant to the new API we're discussing where we allow users to > intentionally bypass that logic when invoking commitTransaction. > > Cheers, > > Chris > > On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io > .invalid> > wrote: > > > Hey folks, > > > > Great discussion! > > > > Re: throwing exceptions from send(). send() is documented to throw > > KafkaException, so if the application doesn't handle it, it should be a > > bug. Now, it does have a note that API exceptions wouldn't be thrown, > not > > sure if we have code that relies on that. There is a reason exceptions > > have classes, they are designed to express a "class of errors" that can > be > > handled, so that we don't have to add a flag or a new method every time > we > > have a new exception to throw. But if there is consensus that it's still > > too risky (especially if we have examples of code that gets broken), > then I > > agree that we shouldn't do it. > > > > Re: various ways to communicate semantics change. If we must have 2 > > different behaviors, I think passing options to "ignore errors" to > > commitTransaction is probably the most intuitive way to do it. What I > > really don't like (in any of the options), is that we cannot really > > document it in a way that articulates a value in the product. There are > > tons of nuances that require understanding some buggy behavior, then > fixed > > behavior, then an option to sometimes turn on buggy behavior, and etc. > > > > > if a user invokes Producer::abortTransaction from within a producer > > callback today > > > > I think we would get invalid state exception. Which we could probably > fix, > > but even if we supported it, I think it would be good if doing send + > > commit would lead to aborted transaction without special action from the > > application -- the simple things should be really simple, any failure > > during send or commit should abort send + commit sequence without special > > handling. > > > > -Artem > > > > On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com> > > wrote: > > > > > One quick thought: if a user invokes Producer::abortTransaction from > > within > > > a producer callback today, even in the midst of an ongoing call to > > > Producer::commitTransaction, what is the behavior? Would it be > reasonable > > > to support this behavior as a way to allow error handling to take place > > > during implicit flushes, via producer callback? > > > > > > On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > > > My point it, that it does not seem to be safe to allow users to > ignore > > > > errors with an implicit flush, and I think it's better to only allow > it > > > > with (ie, after) an explicit flush(). > > > > > > > > My reasoning is, that users should make a decision to ignore errors > or > > > > not, before calling `commitTx()`, but after inspecting all potential > > > > send errors. With an implicit flush, users need to "blindly" decide > to > > > > ignore send errors, because there are pending sends and potential > > errors > > > > are not known yet, when calling `commitTx()`. > > > > > > > > > > > > > > > > > In the documentation of commitTransaction, we say if any send > throws > > an > > > > > error, commitTransaction will too. > > > > > > > > Yes. And I think we should keep it this way for an implicit flush. > With > > > > an explicit flush, `commitTransaction()` cannot encounter any send > > > > errors any longer. > > > > > > > > > > > > > > > > > It says that all callbacks will be executed, but we ignore the > errors > > > of > > > > > the callbacks. > > > > > > > > Ah. Thanks for pointing this out. For this case it's even worse (for > > > > case (2)), because the user cannot inspect any errors and make any > > > > decision to ignore or not during an implicit flush... > > > > > > > > > > > > > > > > > We shouldn't be relying on errors in the callback unless we are > > > > > calling flush, which we can still do. It seems this has always been > > the > > > > > case as well. > > > > > > > > Yes, has always been this way, and my point is to keep it this way > > > > (option (2) would change it), and not start to allow to ignore errors > > > > with an implicit flush. > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 6/24/24 4:57 PM, Justine Olshan wrote: > > > > > Transaction verification is a concept from KIP-890 referring to the > > > > > verification that a partition has been added to the transaction. > It's > > > > not a > > > > > huge deal, but maybe we don't want to overload the terminology. > > > > > > > > > > For option 2, I was a little confused by this > > > > > > > > > >> when commitTx is called, there is still pending Futures and not > > > > > all Callbacks are executed yet -- with the implicit flush, we know > > that > > > > > all Callbacks are executed, but even for this case, the user could > > only > > > > > throw an exception inside the Callback to stop the TX to eventually > > > > > commit -- Futures cannot be used to make a decision to ignore error > > and > > > > > commit or not. > > > > > > > > > > In the documentation of commitTransaction, we say if any send > throws > > an > > > > > error, commitTransaction will too. > > > > > > > > > > *Further, if any of the {@link #send(ProducerRecord)} calls which > > were > > > > part > > > > > of the transaction hit irrecoverable errors, this method will throw > > the > > > > > last received exception immediately and the transaction will not be > > > > > committed.* > > > > > > > > > > It says that all callbacks will be executed, but we ignore the > errors > > > of > > > > > the callbacks. > > > > > > > > > > *If the transaction is committed successfully and this method > returns > > > > > without throwing an exception, it is guaranteed that all {@link > > > Callback > > > > > callbacks} for records in the transaction will have been invoked > and > > > > > completed. Note that exceptions thrown by callbacks are ignored; > the > > > > > producer proceeds to commit the transaction in any case.* > > > > > > > > > > Is it fair to say though that for the send errors, we can choose to > > > > ignore > > > > > them? II wasn't understanding where the callbacks come in with your > > > > > comment. We shouldn't be relying on errors in the callback unless > we > > > are > > > > > calling flush, which we can still do. It seems this has always been > > the > > > > > case as well. > > > > > > > > > > Justine > > > > > > > > > > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield < > > > > andrew_schofi...@live.com> > > > > > wrote: > > > > > > > > > >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy > with > > > 3a > > > > as > > > > >> the way. > > > > >> > > > > >> I suggest “TRANSACTION VERIFIED”. > > > > >> > > > > >> There isn’t really precedent for options in the producer API. We > > could > > > > use > > > > >> an enum, > > > > >> which is easy to use and not very future-proof. Or we could use a > > > class > > > > >> like the > > > > >> admin API does, which is cumbersome and flexible. > > > > >> > > > > >> CommitTransactionOptions.TRANSACTION_VERIFIED > > > > >> > > > > >> or > > > > >> > > > > >> public class CommitTransactionOptions { > > > > >> public CommitTransactionOptions(); > > > > >> > > > > >> CommitTransactionOptions transactionVerified(boolean > > > > >> transactionVerified); > > > > >> > > > > >> boolean transactionVerified(); > > > > >> } > > > > >> > > > > >> > > > > >> Then 3b is: > > > > >> > > > > >> send(…) > > > > >> send(…) > > > > >> flush() > > > > >> commitTransaction(new > > > > >> CommitTransactionOptions().transactionVerified(true)) > > > > >> > > > > >> > > > > >> I’d tend towards the enum here because I doubt we need as much > > > > flexibility > > > > >> as the admin API requires. > > > > >> > > > > >> Thanks, > > > > >> Andrew > > > > >> > > > > >> > > > > >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> > > wrote: > > > > >>> > > > > >>> I am ok either way (ie, flush or commit), but I think we need to > > > define > > > > >> exact semantics, and I think there is some subtle thing to > consider: > > > > >>> > > > > >>> > > > > >>> > > > > >>> 1) flush(Options) > > > > >>> > > > > >>> Example: > > > > >>> > > > > >>> send(...) > > > > >>> send(...) > > > > >>> > > > > >>> flush(ignoreErrors) > > > > >>> > > > > >>> // at this point, we know that all Futures are completed and > all > > > > >> Callbacks are executed, and we can assume that all user code > > checking > > > > for > > > > >> errors did execute, before `commitTx` is called > > > > >>> > > > > >>> // I consider this option as safe > > > > >>> > > > > >>> commitTx() > > > > >>> > > > > >>> > > > > >>> 2) commitTx(Option) > > > > >>> > > > > >>> Example: > > > > >>> > > > > >>> send(...) > > > > >>> send(...) > > > > >>> > > > > >>> // when commitTx is called, there is still pending Futures and > > not > > > > all > > > > >> Callbacks are executed yet -- with the implicit flush, we know > that > > > all > > > > >> Callbacks are executed, but even for this case, the user could > only > > > > throw > > > > >> an exception inside the Callback to stop the TX to eventually > commit > > > -- > > > > >> Futures cannot be used to make a decision to ignore error and > commit > > > or > > > > not. > > > > >>> > > > > >>> // I consider this option not as safe > > > > >>> > > > > >>> commitTx(igrnoreErrors) > > > > >>> > > > > >>> > > > > >>> > > > > >>> 3a) required flush + commitTx(Option) > > > > >>> > > > > >>> Example: > > > > >>> > > > > >>> send(...) > > > > >>> send(...) > > > > >>> > > > > >>> flush() > > > > >>> > > > > >>> // at this point, we know that all Future are completed and all > > > > >> Callbacks are executed, and we can assume that all user code > > checking > > > > for > > > > >> error did execute, before `commitTx` is called > > > > >>> > > > > >>> // I consider this option as safe > > > > >>> > > > > >>> commitTx(ignoreErrors) > > > > >>> > > > > >>> > > > > >>> 3b) missing flush + commitTx(Option) > > > > >>> > > > > >>> Example: > > > > >>> > > > > >>> send(...) > > > > >>> send(...) > > > > >>> > > > > >>> // as flush() was not called explicitly, we should ignore > > > > >> `ignoreErrors` flag and always throw an exception if the producer > is > > > in > > > > >> error state, because we cannot be sure that the user did all > > required > > > > check > > > > >> for error handling > > > > >>> > > > > >>> commitTx(ignoreErrors) > > > > >>> > > > > >>> > > > > >>> > > > > >>> The only issue with option (3) is, that it's more complex and > > > semantics > > > > >> are more subtle. But it might be the a good (necessary?) bridge > > > between > > > > (1) > > > > >> and (2): (3) is semantically sound (we ignore errors via passing a > > > flag > > > > >> into commitTx() instead of flush()), and at the same time safe (we > > > force > > > > >> users to explicitly flush() and [hopefully] do proper error > > handling, > > > > and > > > > >> don't rely in am implicit flush() during commitTx() which might be > > > error > > > > >> prone). > > > > >>> > > > > >>> (Also need to find a good and descriptive name for the flag we > pass > > > > into > > > > >> `commitTx()` for this case.) > > > > >>> > > > > >>> > > > > >>> -Matthias > > > > >>> > > > > >>> > > > > >>> > > > > >>> On 6/24/24 8:51 AM, Andrew Schofield wrote: > > > > >>>> Hi Chris, > > > > >>>> That works for me too. I slightly prefer an option on flush(), > but > > > > what > > > > >> you suggested > > > > >>>> works too. > > > > >>>> Thanks, > > > > >>>> Andrew > > > > >>>>> On 24 Jun 2024, at 15:14, Chris Egerton > <chr...@aiven.io.INVALID > > > > > > > >> wrote: > > > > >>>>> > > > > >>>>> Hi Andrew, > > > > >>>>> > > > > >>>>> I like a lot of what you said, but I still believe it's better > to > > > > >> override > > > > >>>>> commitTransaction than flush. Users will already have to > manually > > > opt > > > > >> in to > > > > >>>>> ignoring errors encountered during transactions, and we can > > > document > > > > >>>>> recommended usage (i.e., explicitly invoking flush() before > > > invoking > > > > >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced > > > > method. > > > > >> I > > > > >>>>> don't believe it's worth the increased cognitive load on users > > with > > > > >>>>> non-transactional producers to introduce an overloaded flush() > > > > variant. > > > > >>>>> > > > > >>>>> Cheers, > > > > >>>>> > > > > >>>>> Chris > > > > >>>>> > > > > >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield < > > > > >> andrew_schofi...@live.com> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Hi Alieh, > > > > >>>>>> Thanks for driving this. Unfortunately, there are many parts > of > > > the > > > > >> API > > > > >>>>>> which > > > > >>>>>> are a bit unfortunate and it’s tricky to make small > improvements > > > > that > > > > >>>>>> don’t have > > > > >>>>>> downsides. > > > > >>>>>> > > > > >>>>>> I don’t like the idea of using a configuration because > > > configuration > > > > >> is > > > > >>>>>> often > > > > >>>>>> outside the application and changing the behaviour of someone > > > else’s > > > > >>>>>> application > > > > >>>>>> without understanding it is risky. Anything which embeds a > > > > >> transactional > > > > >>>>>> producer > > > > >>>>>> could have its behaviour changed unexpectedly. > > > > >>>>>> > > > > >>>>>> It would be been much nicer if send() didn’t fail silently and > > > > change > > > > >> the > > > > >>>>>> transaction > > > > >>>>>> state. But, because it’s an asynchronous operation, I don’t > > really > > > > >> think > > > > >>>>>> we can > > > > >>>>>> just make it throw all exceptions, even though I really think > > that > > > > >>>>>> `send()` is the > > > > >>>>>> method with the problem here. > > > > >>>>>> > > > > >>>>>> The contract of `flush()` is that it makes sure that all > > preceding > > > > >> sends > > > > >>>>>> will have > > > > >>>>>> completed, so it should be true that a well written > application > > > > would > > > > >> be > > > > >>>>>> able to > > > > >>>>>> know which records were OK because of the > Future<RecordMetadata> > > > > >> returned > > > > >>>>>> by the `send()` method. It should be able to determine whether > > it > > > > >> wants to > > > > >>>>>> commit > > > > >>>>>> the transaction even if some of the intended operations didn’t > > > > >> succeed. > > > > >>>>>> > > > > >>>>>> What we don’t currently have is a way for the application to > say > > > to > > > > >> the > > > > >>>>>> KafkaProducer > > > > >>>>>> that it knows the outcome of sending the records and to > confirm > > > that > > > > >> it > > > > >>>>>> wants to proceed. > > > > >>>>>> Then it would not be necessary for `commitTransaction()` to > > throw > > > an > > > > >>>>>> exception to > > > > >>>>>> report a historical error which the application might choose > to > > > > >> ignore. > > > > >>>>>> > > > > >>>>>> Having read the comments, I think the KIP is on the right > lines > > > > >> focusing > > > > >>>>>> on the `flush()` > > > > >>>>>> method. My suggestion is that we introduce an option on > > `flush()` > > > to > > > > >> be > > > > >>>>>> used before > > > > >>>>>> `commitTransaction()` for applications that want to be able to > > > > commit > > > > >>>>>> transactions which > > > > >>>>>> had known failed operations. > > > > >>>>>> > > > > >>>>>> The code would be: > > > > >>>>>> > > > > >>>>>> producer.beginTransaction(); > > > > >>>>>> > > > > >>>>>> future1 = producer.send(goodRecord1); > > > > >>>>>> future2 = producer.send(badRecord); // The future from this > > > call > > > > >> will > > > > >>>>>> complete exceptionally > > > > >>>>>> future3 = producer.send(goodRecord2); > > > > >>>>>> > > > > >>>>>> producer.flush(FlushOption.TRANSACTION_READY); > > > > >>>>>> > > > > >>>>>> // At this point, we know that all 3 futures are complete > and > > > the > > > > >>>>>> transaction contains 2 records > > > > >>>>>> producer.commitTransaction(); > > > > >>>>>> > > > > >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses > the > > > > >> default > > > > >>>>>> option which behaves > > > > >>>>>> like today. > > > > >>>>>> > > > > >>>>>> Why did I suggest an option on `flush()` rather than > > > > >>>>>> `commitTransaction()`? Because with > > > > >>>>>> `flush()`, it’s clear when the application is stating that > it’s > > > seen > > > > >> all > > > > >>>>>> of the results from its > > > > >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on > > > > >> flushing > > > > >>>>>> that occurs inside > > > > >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut. > > > > >>>>>> > > > > >>>>>> Thanks, > > > > >>>>>> Andrew > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi > > > > <asae...@confluent.io.INVALID > > > > >>> > > > > >>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>> Hi all, > > > > >>>>>>> Thanks for the interesting discussion. > > > > >>>>>>> > > > > >>>>>>> I assume that now the main questions are as follows: > > > > >>>>>>> > > > > >>>>>>> 1. Do we need to transit the transcation to the error state > for > > > API > > > > >>>>>>> exceptions? > > > > >>>>>>> 2. Should we throw the API exception in `send()` instead of > > > > >> returning a > > > > >>>>>>> future error? > > > > >>>>>>> 3. If the answer to question (1) is NO and to question (2) is > > > YES, > > > > >> do we > > > > >>>>>>> need to change the current `flush` or `commitTnx` at all? > > > > >>>>>>> > > > > >>>>>>> Cheers, > > > > >>>>>>> Alieh > > > > >>>>>>> > > > > >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax < > > > mj...@apache.org> > > > > >>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>>> Hey Kirk, > > > > >>>>>>>> > > > > >>>>>>>> can you elaborate on a few points? > > > > >>>>>>>> > > > > >>>>>>>>> Otherwise users would have to know to explicitly change > their > > > > code > > > > >> to > > > > >>>>>>>> invoke flush(). > > > > >>>>>>>> > > > > >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the > > > > existing > > > > >>>>>>>> `flush()` w/o any option will still be there, right? If we > > would > > > > >> really > > > > >>>>>>>> deprecate existing `flush()`, it would just mean that we > would > > > > pass > > > > >>>>>>>> "default FlushOption" into an implicit flush (and yes, we > > would > > > > >> need to > > > > >>>>>>>> define what this would be). > > > > >>>>>>>> > > > > >>>>>>>> I think there is no clear winner (as pointed out in my last > > > > reply), > > > > >> and > > > > >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has > > > > >> advantages > > > > >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff > > we > > > > >> want to > > > > >>>>>>>> move forward with? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the > > > > better > > > > >>>>>>>> comparison would be: > > > > >>>>>>>> > > > > >>>>>>>> BEGIN TX; > > > > >>>>>>>> INSERT INTO foo VALUES (’a’); > > > > >>>>>>>> INSERT INTO foo VALUES (’b’); > > > > >>>>>>>> INSERT INTO foo VALUES (’c’); > > > > >>>>>>>> INSERT INTO foo VALUES (’not sure’); > > > > >>>>>>>> > > > > >>>>>>>> For this case, the full TX would roll back, right? I still > > think > > > > >> that > > > > >>>>>>>> allowing users to just skip over the last error, and > continue > > > the > > > > TX > > > > >>>>>>>> would be ok. In the end, we provide a programmatic API, and > > not > > > a > > > > >>>>>>>> declarative one as SQL. Of course, default behavior would > > still > > > be > > > > >> to > > > > >>>>>>>> put the producer into error state, and the user would need > to > > > call > > > > >>>>>>>> `abortTransaction()` to move forward. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -Matthias > > > > >>>>>>>> > > > > >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote: > > > > >>>>>>>>> Hi Matthias, > > > > >>>>>>>>> > > > > >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax < > > > mj...@apache.org > > > > > > > > > >>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>> If we want to limit it to `RecordTooLargeException` > throwing > > > > from > > > > >>>>>>>> `send()` directly make sense. Thanks for calling it out. > > > > >>>>>>>>>> > > > > >>>>>>>>>> It's still a question of backward compatibility? `send()` > > does > > > > >> throw > > > > >>>>>>>> exceptions already, including generic `KafkaException`. Not > > sure > > > > if > > > > >> this > > > > >>>>>>>> helps with backward compatibility? Could we just add a new > > > > exception > > > > >>>>>> type > > > > >>>>>>>> (which is a child of `KafkaException`)? > > > > >>>>>>>>>> > > > > >>>>>>>>>> The Producer JavaDocs are not totally explicit about it > > IMHO. > > > > >>>>>>>>>> > > > > >>>>>>>>>> I think we could expect that some generic error handling > > path > > > > gets > > > > >>>>>>>> executed. For the TX-case, I would assume that a TX would be > > > > >> aborted if > > > > >>>>>>>> `send()` throws or that the producer would be `closed()`. > > > Overall > > > > >> this > > > > >>>>>>>> might be safe? > > > > >>>>>>>>>> > > > > >>>>>>>>>>> It would be a little less flexible > > > > >>>>>>>>>>>> though, since (as you note) it would still be impossible > > to > > > > >> commit > > > > >>>>>>>>>>>> transactions after errors have been reported from > brokers. > > > > >>>>>>>>>> > > > > >>>>>>>>>> KS would still need a way to clear the error state of the > > > > >> producer. We > > > > >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call > > the > > > > >> handler > > > > >>>>>> and > > > > >>>>>>>> let it decide what to do next. But if it does return > > `CONTINUE` > > > to > > > > >>>>>> swallow > > > > >>>>>>>> the error and drop the poison pill record on the floor, we > > would > > > > >> want to > > > > >>>>>>>> move forward and commit the transaction. > > > > >>>>>>>>>> > > > > >>>>>>>>>> But the question is: if we cannot add a record to the tx, > > does > > > > the > > > > >>>>>>>> producer need to go into error state? In the end, we did > throw > > > and > > > > >>>>>> inform > > > > >>>>>>>> the app that the record was _not_ added, and it's up to the > > app > > > to > > > > >>>>>> decide > > > > >>>>>>>> what to do next? > > > > >>>>>>>>> > > > > >>>>>>>>> That’s an excellent question… > > > > >>>>>>>>> > > > > >>>>>>>>> Imagine the user’s application is writing information to a > > > > database > > > > >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column > and > > > > this > > > > >> SQL > > > > >>>>>>>> statement was attempted, what should happen? > > > > >>>>>>>>> > > > > >>>>>>>>> INSERT INTO foo VALUES (’not sure’); > > > > >>>>>>>>> > > > > >>>>>>>>> Yes, that DML would fail, sure, but would the user expect > > that > > > > the > > > > >>>>>>>> connection used by database library would get stuck in some > > kind > > > > of > > > > >>>>>> error > > > > >>>>>>>> state? A user would be able catch the error and either > > continue > > > or > > > > >>>>>> abort, > > > > >>>>>>>> based on their business rules. > > > > >>>>>>>>> > > > > >>>>>>>>> So I agree with what I believe you’re implying: we > shouldn’t > > > > >> poison the > > > > >>>>>>>> Producer/TransactionManager on certain types of > > > application-level > > > > >>>>>> errors in > > > > >>>>>>>> send(). > > > > >>>>>>>>> > > > > >>>>>>>>> Kirk > > > > >>>>>>>>> > > > > >>>>>>>>>> If we report the error only via the `Callback` it's a > > > different > > > > >> story, > > > > >>>>>>>> because the contract for this case is clearly specified on > the > > > > >> JavaDocs: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> When used as part of a transaction, it is not necessary > to > > > > >> define a > > > > >>>>>>>> callback or check the result of the future > > > > >>>>>>>>>>> in order to detect errors from <code>send</code>. If any > of > > > the > > > > >> send > > > > >>>>>>>> calls failed with an irrecoverable error, > > > > >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and > > > throw > > > > >> the > > > > >>>>>>>> exception from the last failed send. When > > > > >>>>>>>>>>> this happens, your application should call {@link > > > > >>>>>> #abortTransaction()} > > > > >>>>>>>> to reset the state and continue to send > > > > >>>>>>>>>>> data. > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> -Matthias > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote: > > > > >>>>>>>>>>> Hi Artem, > > > > >>>>>>>>>>> I think it'd make sense to throw directly from send > > whenever > > > > >>>>>> possible, > > > > >>>>>>>>>>> instead of returning an already-completed future. I > didn't > > do > > > > >> that in > > > > >>>>>>>> my > > > > >>>>>>>>>>> bug fix to try to be conservative about breaking changes > > but > > > > this > > > > >>>>>>>> seems to > > > > >>>>>>>>>>> have caused its own set of headaches. It would be a > little > > > less > > > > >>>>>>>> flexible > > > > >>>>>>>>>>> though, since (as you note) it would still be impossible > to > > > > >> commit > > > > >>>>>>>>>>> transactions after errors have been reported from > brokers. > > > > >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if > > that > > > > >>>>>>>> flexibility > > > > >>>>>>>>>>> is required. If it is, then users could explicitly call > > > flush() > > > > >>>>>> before > > > > >>>>>>>>>>> committing (and ignoring errors for) or aborting a > > > transaction, > > > > >> if > > > > >>>>>> they > > > > >>>>>>>>>>> want to implement fine-grained error handling logic such > as > > > > >> allowing > > > > >>>>>>>> errors > > > > >>>>>>>>>>> for a subset of topics to be ignored. > > > > >>>>>>>>>>> Hi Matthias, > > > > >>>>>>>>>>> Most of the time you're right and we can't throw from > > send(); > > > > >>>>>> however, > > > > >>>>>>>> in > > > > >>>>>>>>>>> this case (client-side record-too-large exception), the > > error > > > > is > > > > >>>>>>>> actually > > > > >>>>>>>>>>> noticed by the producer before send() returns, so it > should > > > be > > > > >>>>>>>> possible to > > > > >>>>>>>>>>> throw directly. > > > > >>>>>>>>>>> Cheers, > > > > >>>>>>>>>>> Chris > > > > >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax < > > > mj...@apache.org> > > > > >>>>>> wrote: > > > > >>>>>>>>>>>> Not sure if we can change send and make it throw, given > > that > > > > >> send() > > > > >>>>>> is > > > > >>>>>>>>>>>> async? That is why users can register a `Callback` to > > begin > > > > >> with, > > > > >>>>>>>> right? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> And Alieh's point about backward compatibility is also a > > > fair > > > > >>>>>> concern. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Actually, this would potentially be even > > > > >>>>>>>>>>>>> worse than the original buggy behavior because the bug > > was > > > > >> that we > > > > >>>>>>>>>>>> ignored > > > > >>>>>>>>>>>>> errors that happened in the "send()" method itself, not > > > > >> necessarily > > > > >>>>>>>> the > > > > >>>>>>>>>>>>> ones that we got from the broker. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would > > > only > > > > >>>>>> swallow > > > > >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I > > > agree > > > > >> that > > > > >>>>>> it > > > > >>>>>>>>>>>> would be very bad to swallow errors about the actual tx > > > > >> commit... > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> It's a fair question if this might be too subtle; to > make > > it > > > > >>>>>> explicit, > > > > >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` > > > [working > > > > >> name] > > > > >>>>>> to > > > > >>>>>>>>>>>> make it clear. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> If we think it's too subtle to change commit to swallow > > > send() > > > > >>>>>> errors, > > > > >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer > > (and > > > > we > > > > >> can > > > > >>>>>>>> use > > > > >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` > [working > > > > name] > > > > >> to > > > > >>>>>> be > > > > >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an > > effect > > > > for > > > > >>>>>> TX). > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Thoughts? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote: > > > > >>>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> It is very exciting to see all the experts here raising > > > very > > > > >> good > > > > >>>>>>>> points. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> As we go further, we see more and more options to > improve > > > our > > > > >>>>>>>> solution, > > > > >>>>>>>>>>>>> which makes concluding and updating the KIP impossible. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> The main suggestions so far are: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3. `send` must throw the exception > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> My concern about the 3rd suggestion: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1. Does the change cause any issue with backward > > > > compatibility? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 2. The `send (bad record)` already transits the > > transaction > > > > to > > > > >> the > > > > >>>>>>>> error > > > > >>>>>>>>>>>>> state. No user, including Streams is able to transit > the > > > > >>>>>> transaction > > > > >>>>>>>> back > > > > >>>>>>>>>>>>> from the error state. Do you mean we remove the > > > > >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here > > > > >>>>>>>>>>>>> < > > > > >>>>>>>>>>>> > > > > >>>>>>>> > > > > >>>>>> > > > > >> > > > > > > > > > > https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112 > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> as well? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>>> Alieh > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield < > > > > >>>>>>>>>>>> andrew_schofi...@live.com> > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi Artem, > > > > >>>>>>>>>>>>>> I think you make a good point which is worth further > > > > >>>>>> consideration. > > > > >>>>>>>> If > > > > >>>>>>>>>>>>>> any of the existing methods is really ripe for a > change > > > > here, > > > > >> it’s > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>> send() that actually caused the problem. If that can > be > > > > fixed > > > > >> so > > > > >>>>>>>> there > > > > >>>>>>>>>>>> are > > > > >>>>>>>>>>>>>> no situations in which a lurking error breaks a > > > transaction, > > > > >> that > > > > >>>>>>>> might > > > > >>>>>>>>>>>> be > > > > >>>>>>>>>>>>>> the best. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>>> Andrew > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits < > > > > >> alivsh...@confluent.io > > > > >>>>>>>>>>>> .INVALID> > > > > >>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I thought we still wait for requests (and their > > errors) > > > to > > > > >> come > > > > >>>>>>>> in and > > > > >>>>>>>>>>>>>>> could handle fatal errors appropriately. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that > > > when > > > > >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to > > ignore > > > > >> errors. > > > > >>>>>>>> So > > > > >>>>>>>>>>>> if > > > > >>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>> do > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 1. send > > > > >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors") > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> the commit will succeed. You can look at the example > > in > > > > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and > > > just > > > > >>>>>>>> substitute > > > > >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send > > > > >> errors") > > > > >>>>>> and > > > > >>>>>>>> we > > > > >>>>>>>>>>>> get > > > > >>>>>>>>>>>>>>> the buggy behavior back :-). Actually, this would > > > > >> potentially be > > > > >>>>>>>> even > > > > >>>>>>>>>>>>>>> worse than the original buggy behavior because the > bug > > > was > > > > >> that > > > > >>>>>> we > > > > >>>>>>>>>>>>>> ignored > > > > >>>>>>>>>>>>>>> errors that happened in the "send()" method itself, > not > > > > >>>>>>>> necessarily the > > > > >>>>>>>>>>>>>>> ones that we got from the broker. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Actually, looking at > > > > >>>>>>>> https://github.com/apache/kafka/pull/11508/files, > > > > >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error > > > from > > > > >> the > > > > >>>>>>>> "send" > > > > >>>>>>>>>>>>>>> method itself, rather than trying to set it to be > > thrown > > > > >> during > > > > >>>>>>>> commit? > > > > >>>>>>>>>>>>>>> This way the example in > > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > > > > >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an > > > > >> opportunity > > > > >>>>>>>> for > > > > >>>>>>>>>>>> KS > > > > >>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>> catch the error and ignore it if needed. Not sure if > > we > > > > >> need a > > > > >>>>>>>> KIP for > > > > >>>>>>>>>>>>>>> that, just do a better fix of the old bug. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> -Artem > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan > > > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid> > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here > > > looks > > > > >>>>>>>> reasonable. > > > > >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes > sense > > > to > > > > >> me and > > > > >>>>>>>> makes > > > > >>>>>>>>>>>>>> me > > > > >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in > the > > > > >> methods > > > > >>>>>>>> relevant > > > > >>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>> the issue. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send > > > > >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > > > without > > > > >>>>>> explicit > > > > >>>>>>>>>>>>>> flush, > > > > >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > > > application > > > > >> won't > > > > >>>>>> be > > > > >>>>>>>>>>>> able > > > > >>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Is this with respect to the case a request is still > > > > inflight > > > > >>>>>> when > > > > >>>>>>>> we > > > > >>>>>>>>>>>>>> call > > > > >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for > > requests > > > > (and > > > > >>>>>> their > > > > >>>>>>>>>>>>>> errors) > > > > >>>>>>>>>>>>>>>> to come in and could handle fatal errors > > appropriately. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Justine > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits > > > > >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas), > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or > similar > > > > could > > > > >> be a > > > > >>>>>>>> good > > > > >>>>>>>>>>>>>>>> way > > > > >>>>>>>>>>>>>>>>> forward? > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> I like this approach. One minor concern is that if > > we > > > > set > > > > >>>>>>>> "ignore > > > > >>>>>>>>>>>> send > > > > >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option > > > without > > > > >>>>>>>> explicit > > > > >>>>>>>>>>>>>> flush, > > > > >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the > > > application > > > > >> won't > > > > >>>>>>>> be > > > > >>>>>>>>>>>> able > > > > >>>>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors. > > > But > > > > I > > > > >>>>>> guess > > > > >>>>>>>>>>>> we'll > > > > >>>>>>>>>>>>>>>> just > > > > >>>>>>>>>>>>>>>>> have to clearly document it. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to > > > optionally > > > > >>>>>> restore > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 > > bug, > > > > >> which is > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-). > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> -Artem > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax < > > > > >>>>>>>> mj...@apache.org> > > > > >>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a > lot > > of > > > > >>>>>> support. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new > > > > >> method". I > > > > >>>>>>>> think > > > > >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` > but > > > > rather > > > > >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; > > for > > > > >> non-tx > > > > >>>>>>>> case, > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> different flush variants would not make sense. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some > "options" > > > > >> object, so > > > > >>>>>>>> maybe > > > > >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar > could > > > be a > > > > >> good > > > > >>>>>>>> way > > > > >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` > > parameter, > > > > >>>>>>>> aesthetically, > > > > >>>>>>>>>>>>>> as > > > > >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, > > we > > > > >> might > > > > >>>>>> not > > > > >>>>>>>> even > > > > >>>>>>>>>>>>>>>>>> need to deprecate the existing > `commitTransaction()` > > > > >> method? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote: > > > > >>>>>>>>>>>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which > changes > > > the > > > > >>>>>>>> behaviour > > > > >>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs. > > > But I > > > > >>>>>> totally > > > > >>>>>>>>>>>>>>>>>> understand > > > > >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some > of > > > the > > > > >> other > > > > >>>>>>>>>>>>>>>>> suggestions > > > > >>>>>>>>>>>>>>>>>>> in this thread seem neater. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Personally, I would add another method to > > > > KafkaProducer. > > > > >> Not > > > > >>>>>> an > > > > >>>>>>>>>>>>>>>>> overload > > > > >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. > > Using > > > > >>>>>> Matthias’s > > > > >>>>>>>>>>>>>>>>> options, > > > > >>>>>>>>>>>>>>>>>>> I prefer (3). > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>>>>>>>> Andrew > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. < > > > > liane...@gmail.com > > > > >>> > > > > >>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh! > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the > > > config > > > > >> not > > > > >>>>>>>> being > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> most > > > > >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this > capability. > > > > >> Getting > > > > >>>>>>>> then to > > > > >>>>>>>>>>>>>>>>>> Matthias > > > > >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is > that > > > it > > > > >> seems > > > > >>>>>>>> they > > > > >>>>>>>>>>>>>>>> might > > > > >>>>>>>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting > some > > > > other > > > > >>>>>> twist > > > > >>>>>>>> to > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> flush > > > > >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another > > param > > > > to > > > > >> it, > > > > >>>>>> or > > > > >>>>>>>>>>>>>>>> another > > > > >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the > context > > to > > > > >> answer > > > > >>>>>>>> that, > > > > >>>>>>>>>>>>>>>> but > > > > >>>>>>>>>>>>>>>>>> if it > > > > >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some > > kind > > > > >>>>>>>> FlushOptions > > > > >>>>>>>>>>>>>>>>>> params to > > > > >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility > > > point > > > > of > > > > >>>>>>>> view. It > > > > >>>>>>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for > now > > > but > > > > >> could > > > > >>>>>>>>>>>> accept > > > > >>>>>>>>>>>>>>>>> any > > > > >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove > > the > > > > >>>>>>>> "ugliness" > > > > >>>>>>>>>>>>>>>>> Matthias > > > > >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the > > > different > > > > >>>>>>>> semantics > > > > >>>>>>>>>>>> for > > > > >>>>>>>>>>>>>>>>>> flush, > > > > >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush > and > > > > >>>>>>>>>>>> commitTransaction > > > > >>>>>>>>>>>>>>>>>> java > > > > >>>>>>>>>>>>>>>>>>>> docs. It currently states that flush "clears > the > > > last > > > > >>>>>>>> exception" > > > > >>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called > after > > > > >> flush, > > > > >>>>>> but > > > > >>>>>>>> it > > > > >>>>>>>>>>>>>>>>> really > > > > >>>>>>>>>>>>>>>>>> all > > > > >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an > > > example > > > > to > > > > >>>>>> show > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>>>> new > > > > >>>>>>>>>>>>>>>>>> flow > > > > >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great > > gain > > > > >> here): > > > > >>>>>>>> flush > > > > >>>>>>>>>>>>>>>> with > > > > >>>>>>>>>>>>>>>>>> clear > > > > >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever > > error > > > > >> handling > > > > >>>>>>>> we > > > > >>>>>>>>>>>> want > > > > >>>>>>>>>>>>>>>>> -> > > > > >>>>>>>>>>>>>>>>>>>> commitTransaction successfully > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Thanks! > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Lianet > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton < > > > > >>>>>>>>>>>>>>>>> fearthecel...@gmail.com > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Hi Matthias, > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more > > > that > > > > >> might > > > > >>>>>>>> help > > > > >>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>> if, > > > > >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded > > > > >>>>>>>> commitTransaction() > > > > >>>>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean > > > > >>>>>>>> tolerateRecordErrors). > > > > >>>>>>>>>>>>>>>> This > > > > >>>>>>>>>>>>>>>>>> seems > > > > >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the > behavioral > > > > >> change we > > > > >>>>>>>> want, > > > > >>>>>>>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>>>>> only > > > > >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API > > > method > > > > >> that > > > > >>>>>> is > > > > >>>>>>>> only > > > > >>>>>>>>>>>>>>>>> used > > > > >>>>>>>>>>>>>>>>>> for > > > > >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid > the > > > > issue > > > > >> of > > > > >>>>>>>> whether > > > > >>>>>>>>>>>>>>>> or > > > > >>>>>>>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered > > > > semantics) > > > > >>>>>>>> should > > > > >>>>>>>>>>>>>>>> throw > > > > >>>>>>>>>>>>>>>>> or > > > > >>>>>>>>>>>>>>>>>>>>> not. Thoughts? > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot > > > more > > > > >> than > > > > >>>>>> the > > > > >>>>>>>>>>>>>>>>> pluggable > > > > >>>>>>>>>>>>>>>>>>>>> handler! > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this > > > behavior > > > > >> via > > > > >>>>>>>>>>>>>>>>> configuration > > > > >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that > > > > >> application > > > > >>>>>>>> code > > > > >>>>>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one > type > > of > > > > >>>>>> behavior > > > > >>>>>>>> from > > > > >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that > > > > application > > > > >> code > > > > >>>>>>>> to > > > > >>>>>>>>>>>>>>>>> declare > > > > >>>>>>>>>>>>>>>>>> its > > > > >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer > > seems > > > > >> more > > > > >>>>>>>>>>>>>>>> appropriate > > > > >>>>>>>>>>>>>>>>>> than, > > > > >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application > > to > > > > >> tweak a > > > > >>>>>>>>>>>>>>>>>> configuration > > > > >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside > > it. > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> Chris > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. > Sax > > < > > > > >>>>>>>>>>>> mj...@apache.org > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the > > KIP > > > > >> as-is, > > > > >>>>>> but > > > > >>>>>>>>>>>> think > > > > >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points... > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move > > forward? > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> 1. add config to allow "silent error > > clearance" > > > as > > > > >> the > > > > >>>>>>>> KIP > > > > >>>>>>>>>>>>>>>>> proposes > > > > >>>>>>>>>>>>>>>>>>>>>> 2. change flush() to clear error and let it > > > throw > > > > >>>>>>>>>>>>>>>>>>>>>> 3. add new flushAndThrow()` (or better name) > > > which > > > > >>>>>> clears > > > > >>>>>>>>>>>> error > > > > >>>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>>>>>> throws > > > > >>>>>>>>>>>>>>>>>>>>>> 4. add `flush(boolean clearAndThrow)` and > let > > > user > > > > >> pick > > > > >>>>>>>> (and > > > > >>>>>>>>>>>>>>>>>> deprecate > > > > >>>>>>>>>>>>>>>>>>>>>> existing `flush()`) > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior > > change, > > > > we > > > > >>>>>> might > > > > >>>>>>>> also > > > > >>>>>>>>>>>>>>>>> need > > > > >>>>>>>>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config. > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue > Artem > > > > >> mentioned. > > > > >>>>>>>> (3) > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>> (4) > > > > >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both > we > > > > >> kinda get > > > > >>>>>>>> an > > > > >>>>>>>>>>>> ugly > > > > >>>>>>>>>>>>>>>>>> API? > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. > > Seems > > > > we > > > > >> need > > > > >>>>>>>> to > > > > >>>>>>>>>>>> pick > > > > >>>>>>>>>>>>>>>>>> some > > > > >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? > > > Would > > > > >> be > > > > >>>>>>>> good to > > > > >>>>>>>>>>>>>>>> her > > > > >>>>>>>>>>>>>>>>>> from > > > > >>>>>>>>>>>>>>>>>>>>>> others what they think > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote: > > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh, > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP. I have a couple of > > > > >> suggestions: > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> AL1. We should throw an error from flush > after > > > we > > > > >> clear > > > > >>>>>>>> it. > > > > >>>>>>>>>>>>>>>> This > > > > >>>>>>>>>>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and > "send > > + > > > > >> flush + > > > > >>>>>>>>>>>> commit" > > > > >>>>>>>>>>>>>>>>> (the > > > > >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to > > > > express > > > > >> the > > > > >>>>>>>>>>>> former, > > > > >>>>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) > > would > > > > >> throw if > > > > >>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>> transaction > > > > >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written > either > > > way > > > > >> it's > > > > >>>>>>>> going > > > > >>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>>>>>> correct). > > > > >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be > extended > > by > > > > the > > > > >>>>>>>> caller to > > > > >>>>>>>>>>>>>>>>>>>>> intercept > > > > >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and > > > commit > > > > >> the > > > > >>>>>>>>>>>>>>>>> transaction. > > > > >>>>>>>>>>>>>>>>>>>>>> This > > > > >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if > > > someone > > > > >> has > > > > >>>>>>>> code > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>>>>> doesn't > > > > >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic > > > "send + > > > > >>>>>> flush + > > > > >>>>>>>>>>>>>>>> commit" > > > > >>>>>>>>>>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things > > possible, > > > > an > > > > >>>>>>>>>>>> application > > > > >>>>>>>>>>>>>>>>> can > > > > >>>>>>>>>>>>>>>>>>>>> add > > > > >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some > > errors. > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> AL2. I'm not sure if config is the best way > to > > > > >> express > > > > >>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>> modification > > > > >>>>>>>>>>>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application > logic > > > that > > > > >> calls > > > > >>>>>>>>>>>> "flush" > > > > >>>>>>>>>>>>>>>>>> needs > > > > >>>>>>>>>>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring > > > > >> semantics in > > > > >>>>>> a > > > > >>>>>>>>>>>>>>>> detached > > > > >>>>>>>>>>>>>>>>>>>>> place > > > > >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies. > > > This > > > > >> can > > > > >>>>>> be > > > > >>>>>>>>>>>>>>>>> especially > > > > >>>>>>>>>>>>>>>>>>>>> bad > > > > >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a > file > > > at > > > > >> run > > > > >>>>>>>> time, in > > > > >>>>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>>>>>> case a > > > > >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the > > > > application > > > > >>>>>>>> because it > > > > >>>>>>>>>>>>>>>> was > > > > >>>>>>>>>>>>>>>>>>>>>> written > > > > >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the > > semantics > > > > is > > > > >>>>>>>> switched. > > > > >>>>>>>>>>>>>>>>> Given > > > > >>>>>>>>>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the > > caller's > > > > >>>>>>>> expectation, > > > > >>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>>> way > > > > >>>>>>>>>>>>>>>>>> to > > > > >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's > > > > >> expectation > > > > >>>>>>>> to > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>>> "flush" > > > > >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different > > > name > > > > or > > > > >>>>>> have > > > > >>>>>>>> an > > > > >>>>>>>>>>>>>>>>>> overload > > > > >>>>>>>>>>>>>>>>>>>>>> with > > > > >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the > > semantics > > > > (the > > > > >>>>>>>> current > > > > >>>>>>>>>>>>>>>>> method > > > > >>>>>>>>>>>>>>>>>>>>>> could > > > > >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one). > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> -Artem > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi > > > > >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid> > > > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for > KIP-1059 > > > > that > > > > >>>>>>>> suggests > > > > >>>>>>>>>>>>>>>>> adding > > > > >>>>>>>>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>>>>>>>> new > > > > >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method. > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>> > > > > >>>>>> > > > > >> > > > > > > > > > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error > > >