Hi all,

Appreciation for maintaining the momentum of our discussion.


I see kinda consensus over the main points. It seems that we agreed on the
following:

1) Define the `commitTnx(commitOptions)` to clear the error.

2) Make the user explicitly call  `flush()` before
`commitTnx(commitOptions)`, if he determines ignoring errors.


I updated the KIP with the above-mentioned points. Please take a look. I am
sure it is not perfect yet, and there are/will be some open questions, but
if you agree, I will open voting as well. Of course, the discussion can
still carry on in this thread.


Cheers,

Alieh

On Tue, Jun 25, 2024 at 11:36 AM Chris Egerton <fearthecel...@gmail.com>
wrote:

> Hi Artem,
>
> Yes, I completely agree that by default, special action shouldn't be
> required from users to prevent transactions from being committed when one
> or more records can't be sent. The behavior I was suggesting was only
> relevant to the new API we're discussing where we allow users to
> intentionally bypass that logic when invoking commitTransaction.
>
> Cheers,
>
> Chris
>
> On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io
> .invalid>
> wrote:
>
> > Hey folks,
> >
> > Great discussion!
> >
> > Re: throwing exceptions from send().  send() is documented to throw
> > KafkaException, so if the application doesn't handle it, it should be a
> > bug.  Now, it does have a note that API exceptions wouldn't be thrown,
> not
> > sure if we have code that relies on that.  There is a reason exceptions
> > have classes, they are designed to express a "class of errors" that can
> be
> > handled, so that we don't have to add a flag or a new method every time
> we
> > have a new exception to throw.  But if there is consensus that it's still
> > too risky (especially if we have examples of code that gets broken),
> then I
> > agree that we shouldn't do it.
> >
> > Re: various ways to communicate semantics change.  If we must have 2
> > different behaviors, I think passing options to "ignore errors" to
> > commitTransaction is probably the most intuitive way to do it.  What I
> > really don't like (in any of the options), is that we cannot really
> > document it in a way that articulates a value in the product.  There are
> > tons of nuances that require understanding some buggy behavior, then
> fixed
> > behavior, then an option to sometimes turn on buggy behavior, and etc.
> >
> > >  if a user invokes Producer::abortTransaction from within a producer
> > callback today
> >
> > I think we would get invalid state exception.  Which we could probably
> fix,
> > but even if we supported it, I think it would be good if doing send +
> > commit would lead to aborted transaction without special action from the
> > application -- the simple things should be really simple, any failure
> > during send or commit should abort send + commit sequence without special
> > handling.
> >
> > -Artem
> >
> > On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com>
> > wrote:
> >
> > > One quick thought: if a user invokes Producer::abortTransaction from
> > within
> > > a producer callback today, even in the midst of an ongoing call to
> > > Producer::commitTransaction, what is the behavior? Would it be
> reasonable
> > > to support this behavior as a way to allow error handling to take place
> > > during implicit flushes, via producer callback?
> > >
> > > On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > > > My point it, that it does not seem to be safe to allow users to
> ignore
> > > > errors with an implicit flush, and I think it's better to only allow
> it
> > > > with (ie, after) an explicit flush().
> > > >
> > > > My reasoning is, that users should make a decision to ignore errors
> or
> > > > not, before calling `commitTx()`, but after inspecting all potential
> > > > send errors. With an implicit flush, users need to "blindly" decide
> to
> > > > ignore send errors, because there are pending sends and potential
> > errors
> > > > are not known yet, when calling `commitTx()`.
> > > >
> > > >
> > > >
> > > > > In the documentation of commitTransaction, we say if any send
> throws
> > an
> > > > > error, commitTransaction will too.
> > > >
> > > > Yes. And I think we should keep it this way for an implicit flush.
> With
> > > > an explicit flush, `commitTransaction()` cannot encounter any send
> > > > errors any longer.
> > > >
> > > >
> > > >
> > > > > It says that all callbacks will be executed, but we ignore the
> errors
> > > of
> > > > > the callbacks.
> > > >
> > > > Ah. Thanks for pointing this out. For this case it's even worse (for
> > > > case (2)), because the user cannot inspect any errors and make any
> > > > decision to ignore or not during an implicit flush...
> > > >
> > > >
> > > >
> > > > > We shouldn't be relying on errors in the callback unless we are
> > > > > calling flush, which we can still do. It seems this has always been
> > the
> > > > > case as well.
> > > >
> > > > Yes, has always been this way, and my point is to keep it this way
> > > > (option (2) would change it), and not start to allow to ignore errors
> > > > with an implicit flush.
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 6/24/24 4:57 PM, Justine Olshan wrote:
> > > > > Transaction verification is a concept from KIP-890 referring to the
> > > > > verification that a partition has been added to the transaction.
> It's
> > > > not a
> > > > > huge deal, but maybe we don't want to overload the terminology.
> > > > >
> > > > > For option 2, I was a little confused by this
> > > > >
> > > > >>    when commitTx is called, there is still pending Futures and not
> > > > > all Callbacks are executed yet -- with the implicit flush, we know
> > that
> > > > > all Callbacks are executed, but even for this case, the user could
> > only
> > > > > throw an exception inside the Callback to stop the TX to eventually
> > > > > commit -- Futures cannot be used to make a decision to ignore error
> > and
> > > > > commit or not.
> > > > >
> > > > > In the documentation of commitTransaction, we say if any send
> throws
> > an
> > > > > error, commitTransaction will too.
> > > > >
> > > > > *Further, if any of the {@link #send(ProducerRecord)} calls which
> > were
> > > > part
> > > > > of the transaction hit irrecoverable errors, this method will throw
> > the
> > > > > last received exception immediately and the transaction will not be
> > > > > committed.*
> > > > >
> > > > > It says that all callbacks will be executed, but we ignore the
> errors
> > > of
> > > > > the callbacks.
> > > > >
> > > > > *If the transaction is committed successfully and this method
> returns
> > > > > without throwing an exception, it is guaranteed that all {@link
> > > Callback
> > > > > callbacks} for records in the transaction will have been invoked
> and
> > > > > completed. Note that exceptions thrown by callbacks are ignored;
> the
> > > > > producer proceeds to commit the transaction in any case.*
> > > > >
> > > > > Is it fair to say though that for the send errors, we can choose to
> > > > ignore
> > > > > them? II wasn't understanding where the callbacks come in with your
> > > > > comment. We shouldn't be relying on errors in the callback unless
> we
> > > are
> > > > > calling flush, which we can still do. It seems this has always been
> > the
> > > > > case as well.
> > > > >
> > > > > Justine
> > > > >
> > > > > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
> > > > andrew_schofi...@live.com>
> > > > > wrote:
> > > > >
> > > > >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy
> with
> > > 3a
> > > > as
> > > > >> the way.
> > > > >>
> > > > >> I suggest “TRANSACTION VERIFIED”.
> > > > >>
> > > > >> There isn’t really precedent for options in the producer API. We
> > could
> > > > use
> > > > >> an enum,
> > > > >> which is easy to use and not very future-proof. Or we could use a
> > > class
> > > > >> like the
> > > > >> admin API does, which is cumbersome and flexible.
> > > > >>
> > > > >>    CommitTransactionOptions.TRANSACTION_VERIFIED
> > > > >>
> > > > >> or
> > > > >>
> > > > >>    public class CommitTransactionOptions {
> > > > >>      public CommitTransactionOptions();
> > > > >>
> > > > >>      CommitTransactionOptions transactionVerified(boolean
> > > > >> transactionVerified);
> > > > >>
> > > > >>      boolean transactionVerified();
> > > > >>    }
> > > > >>
> > > > >>
> > > > >> Then 3b is:
> > > > >>
> > > > >>     send(…)
> > > > >>     send(…)
> > > > >>     flush()
> > > > >>     commitTransaction(new
> > > > >> CommitTransactionOptions().transactionVerified(true))
> > > > >>
> > > > >>
> > > > >> I’d tend towards the enum here because I doubt we need as much
> > > > flexibility
> > > > >> as the admin API requires.
> > > > >>
> > > > >> Thanks,
> > > > >> Andrew
> > > > >>
> > > > >>
> > > > >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org>
> > wrote:
> > > > >>>
> > > > >>> I am ok either way (ie, flush or commit), but I think we need to
> > > define
> > > > >> exact semantics, and I think there is some subtle thing to
> consider:
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> 1) flush(Options)
> > > > >>>
> > > > >>> Example:
> > > > >>>
> > > > >>>   send(...)
> > > > >>>   send(...)
> > > > >>>
> > > > >>>   flush(ignoreErrors)
> > > > >>>
> > > > >>>   // at this point, we know that all Futures are completed and
> all
> > > > >> Callbacks are executed, and we can assume that all user code
> > checking
> > > > for
> > > > >> errors did execute, before `commitTx` is called
> > > > >>>
> > > > >>>   // I consider this option as safe
> > > > >>>
> > > > >>>   commitTx()
> > > > >>>
> > > > >>>
> > > > >>> 2) commitTx(Option)
> > > > >>>
> > > > >>> Example:
> > > > >>>
> > > > >>>   send(...)
> > > > >>>   send(...)
> > > > >>>
> > > > >>>   // when commitTx is called, there is still pending Futures and
> > not
> > > > all
> > > > >> Callbacks are executed yet -- with the implicit flush, we know
> that
> > > all
> > > > >> Callbacks are executed, but even for this case, the user could
> only
> > > > throw
> > > > >> an exception inside the Callback to stop the TX to eventually
> commit
> > > --
> > > > >> Futures cannot be used to make a decision to ignore error and
> commit
> > > or
> > > > not.
> > > > >>>
> > > > >>>   // I consider this option not as safe
> > > > >>>
> > > > >>>   commitTx(igrnoreErrors)
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> 3a) required flush + commitTx(Option)
> > > > >>>
> > > > >>> Example:
> > > > >>>
> > > > >>>   send(...)
> > > > >>>   send(...)
> > > > >>>
> > > > >>>   flush()
> > > > >>>
> > > > >>>   // at this point, we know that all Future are completed and all
> > > > >> Callbacks are executed, and we can assume that all user code
> > checking
> > > > for
> > > > >> error did execute, before `commitTx` is called
> > > > >>>
> > > > >>>   // I consider this option as safe
> > > > >>>
> > > > >>>   commitTx(ignoreErrors)
> > > > >>>
> > > > >>>
> > > > >>> 3b) missing flush + commitTx(Option)
> > > > >>>
> > > > >>> Example:
> > > > >>>
> > > > >>>   send(...)
> > > > >>>   send(...)
> > > > >>>
> > > > >>>   // as flush() was not called explicitly, we should ignore
> > > > >> `ignoreErrors` flag and always throw an exception if the producer
> is
> > > in
> > > > >> error state, because we cannot be sure that the user did all
> > required
> > > > check
> > > > >> for error handling
> > > > >>>
> > > > >>>   commitTx(ignoreErrors)
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> The only issue with option (3) is, that it's more complex and
> > > semantics
> > > > >> are more subtle. But it might be the a good (necessary?) bridge
> > > between
> > > > (1)
> > > > >> and (2): (3) is semantically sound (we ignore errors via passing a
> > > flag
> > > > >> into commitTx() instead of flush()), and at the same time safe (we
> > > force
> > > > >> users to explicitly flush() and [hopefully] do proper error
> > handling,
> > > > and
> > > > >> don't rely in am implicit flush() during commitTx() which might be
> > > error
> > > > >> prone).
> > > > >>>
> > > > >>> (Also need to find a good and descriptive name for the flag we
> pass
> > > > into
> > > > >> `commitTx()` for this case.)
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On 6/24/24 8:51 AM, Andrew Schofield wrote:
> > > > >>>> Hi Chris,
> > > > >>>> That works for me too. I slightly prefer an option on flush(),
> but
> > > > what
> > > > >> you suggested
> > > > >>>> works too.
> > > > >>>> Thanks,
> > > > >>>> Andrew
> > > > >>>>> On 24 Jun 2024, at 15:14, Chris Egerton
> <chr...@aiven.io.INVALID
> > >
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>> Hi Andrew,
> > > > >>>>>
> > > > >>>>> I like a lot of what you said, but I still believe it's better
> to
> > > > >> override
> > > > >>>>> commitTransaction than flush. Users will already have to
> manually
> > > opt
> > > > >> in to
> > > > >>>>> ignoring errors encountered during transactions, and we can
> > > document
> > > > >>>>> recommended usage (i.e., explicitly invoking flush() before
> > > invoking
> > > > >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced
> > > > method.
> > > > >> I
> > > > >>>>> don't believe it's worth the increased cognitive load on users
> > with
> > > > >>>>> non-transactional producers to introduce an overloaded flush()
> > > > variant.
> > > > >>>>>
> > > > >>>>> Cheers,
> > > > >>>>>
> > > > >>>>> Chris
> > > > >>>>>
> > > > >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
> > > > >> andrew_schofi...@live.com>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Alieh,
> > > > >>>>>> Thanks for driving this. Unfortunately, there are many parts
> of
> > > the
> > > > >> API
> > > > >>>>>> which
> > > > >>>>>> are a bit unfortunate and it’s tricky to make small
> improvements
> > > > that
> > > > >>>>>> don’t have
> > > > >>>>>> downsides.
> > > > >>>>>>
> > > > >>>>>> I don’t like the idea of using a configuration because
> > > configuration
> > > > >> is
> > > > >>>>>> often
> > > > >>>>>> outside the application and changing the behaviour of someone
> > > else’s
> > > > >>>>>> application
> > > > >>>>>> without understanding it is risky. Anything which embeds a
> > > > >> transactional
> > > > >>>>>> producer
> > > > >>>>>> could have its behaviour changed unexpectedly.
> > > > >>>>>>
> > > > >>>>>> It would be been much nicer if send() didn’t fail silently and
> > > > change
> > > > >> the
> > > > >>>>>> transaction
> > > > >>>>>> state. But, because it’s an asynchronous operation, I don’t
> > really
> > > > >> think
> > > > >>>>>> we can
> > > > >>>>>> just make it throw all exceptions, even though I really think
> > that
> > > > >>>>>> `send()` is the
> > > > >>>>>> method with the problem here.
> > > > >>>>>>
> > > > >>>>>> The contract of `flush()` is that it makes sure that all
> > preceding
> > > > >> sends
> > > > >>>>>> will have
> > > > >>>>>> completed, so it should be true that a well written
> application
> > > > would
> > > > >> be
> > > > >>>>>> able to
> > > > >>>>>> know which records were OK because of the
> Future<RecordMetadata>
> > > > >> returned
> > > > >>>>>> by the `send()` method. It should be able to determine whether
> > it
> > > > >> wants to
> > > > >>>>>> commit
> > > > >>>>>> the transaction even if some of the intended operations didn’t
> > > > >> succeed.
> > > > >>>>>>
> > > > >>>>>> What we don’t currently have is a way for the application to
> say
> > > to
> > > > >> the
> > > > >>>>>> KafkaProducer
> > > > >>>>>> that it knows the outcome of sending the records and to
> confirm
> > > that
> > > > >> it
> > > > >>>>>> wants to proceed.
> > > > >>>>>> Then it would not be necessary for `commitTransaction()` to
> > throw
> > > an
> > > > >>>>>> exception to
> > > > >>>>>> report a historical error which the application might choose
> to
> > > > >> ignore.
> > > > >>>>>>
> > > > >>>>>> Having read the comments, I think the KIP is on the right
> lines
> > > > >> focusing
> > > > >>>>>> on the `flush()`
> > > > >>>>>> method. My suggestion is that we introduce an option on
> > `flush()`
> > > to
> > > > >> be
> > > > >>>>>> used before
> > > > >>>>>> `commitTransaction()` for applications that want to be able to
> > > > commit
> > > > >>>>>> transactions which
> > > > >>>>>> had known failed operations.
> > > > >>>>>>
> > > > >>>>>> The code would be:
> > > > >>>>>>
> > > > >>>>>>    producer.beginTransaction();
> > > > >>>>>>
> > > > >>>>>>    future1 = producer.send(goodRecord1);
> > > > >>>>>>    future2 = producer.send(badRecord); // The future from this
> > > call
> > > > >> will
> > > > >>>>>> complete exceptionally
> > > > >>>>>>    future3 = producer.send(goodRecord2);
> > > > >>>>>>
> > > > >>>>>>    producer.flush(FlushOption.TRANSACTION_READY);
> > > > >>>>>>
> > > > >>>>>>    // At this point, we know that all 3 futures are complete
> and
> > > the
> > > > >>>>>> transaction contains 2 records
> > > > >>>>>>    producer.commitTransaction();
> > > > >>>>>>
> > > > >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses
> the
> > > > >> default
> > > > >>>>>> option which behaves
> > > > >>>>>> like today.
> > > > >>>>>>
> > > > >>>>>> Why did I suggest an option on `flush()` rather than
> > > > >>>>>> `commitTransaction()`? Because with
> > > > >>>>>> `flush()`, it’s clear when the application is stating that
> it’s
> > > seen
> > > > >> all
> > > > >>>>>> of the results from its
> > > > >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on
> > > > >> flushing
> > > > >>>>>> that occurs inside
> > > > >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> Andrew
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi
> > > > <asae...@confluent.io.INVALID
> > > > >>>
> > > > >>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>> Hi all,
> > > > >>>>>>> Thanks for the interesting discussion.
> > > > >>>>>>>
> > > > >>>>>>> I assume that now the main questions are as follows:
> > > > >>>>>>>
> > > > >>>>>>> 1. Do we need to transit the transcation to the error state
> for
> > > API
> > > > >>>>>>> exceptions?
> > > > >>>>>>> 2. Should we throw the API exception in `send()` instead of
> > > > >> returning a
> > > > >>>>>>> future error?
> > > > >>>>>>> 3. If the answer to question (1) is NO and to question (2) is
> > > YES,
> > > > >> do we
> > > > >>>>>>> need to change the current `flush` or `commitTnx` at all?
> > > > >>>>>>>
> > > > >>>>>>> Cheers,
> > > > >>>>>>> Alieh
> > > > >>>>>>>
> > > > >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <
> > > mj...@apache.org>
> > > > >>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hey Kirk,
> > > > >>>>>>>>
> > > > >>>>>>>> can you elaborate on a few points?
> > > > >>>>>>>>
> > > > >>>>>>>>> Otherwise users would have to know to explicitly change
> their
> > > > code
> > > > >> to
> > > > >>>>>>>> invoke flush().
> > > > >>>>>>>>
> > > > >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the
> > > > existing
> > > > >>>>>>>> `flush()` w/o any option will still be there, right? If we
> > would
> > > > >> really
> > > > >>>>>>>> deprecate existing `flush()`, it would just mean that we
> would
> > > > pass
> > > > >>>>>>>> "default FlushOption" into an implicit flush (and yes, we
> > would
> > > > >> need to
> > > > >>>>>>>> define what this would be).
> > > > >>>>>>>>
> > > > >>>>>>>> I think there is no clear winner (as pointed out in my last
> > > > reply),
> > > > >> and
> > > > >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has
> > > > >> advantages
> > > > >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff
> > we
> > > > >> want to
> > > > >>>>>>>> move forward with?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the
> > > > better
> > > > >>>>>>>> comparison would be:
> > > > >>>>>>>>
> > > > >>>>>>>> BEGIN TX;
> > > > >>>>>>>> INSERT INTO foo VALUES (’a’);
> > > > >>>>>>>> INSERT INTO foo VALUES (’b’);
> > > > >>>>>>>> INSERT INTO foo VALUES (’c’);
> > > > >>>>>>>> INSERT INTO foo VALUES (’not sure’);
> > > > >>>>>>>>
> > > > >>>>>>>> For this case, the full TX would roll back, right? I still
> > think
> > > > >> that
> > > > >>>>>>>> allowing users to just skip over the last error, and
> continue
> > > the
> > > > TX
> > > > >>>>>>>> would be ok. In the end, we provide a programmatic API, and
> > not
> > > a
> > > > >>>>>>>> declarative one as SQL. Of course, default behavior would
> > still
> > > be
> > > > >> to
> > > > >>>>>>>> put the producer into error state, and the user would need
> to
> > > call
> > > > >>>>>>>> `abortTransaction()` to move forward.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
> > > > >>>>>>>>> Hi Matthias,
> > > > >>>>>>>>>
> > > > >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <
> > > mj...@apache.org
> > > > >
> > > > >>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> If we want to limit it to `RecordTooLargeException`
> throwing
> > > > from
> > > > >>>>>>>> `send()` directly make sense. Thanks for calling it out.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> It's still a question of backward compatibility? `send()`
> > does
> > > > >> throw
> > > > >>>>>>>> exceptions already, including generic `KafkaException`. Not
> > sure
> > > > if
> > > > >> this
> > > > >>>>>>>> helps with backward compatibility? Could we just add a new
> > > > exception
> > > > >>>>>> type
> > > > >>>>>>>> (which is a child of `KafkaException`)?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The Producer JavaDocs are not totally explicit about it
> > IMHO.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I think we could expect that some generic error handling
> > path
> > > > gets
> > > > >>>>>>>> executed. For the TX-case, I would assume that a TX would be
> > > > >> aborted if
> > > > >>>>>>>> `send()` throws or that the producer would be `closed()`.
> > > Overall
> > > > >> this
> > > > >>>>>>>> might be safe?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> It would be a little less flexible
> > > > >>>>>>>>>>>> though, since (as you note) it would still be impossible
> > to
> > > > >> commit
> > > > >>>>>>>>>>>> transactions after errors have been reported from
> brokers.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> KS would still need a way to clear the error state of the
> > > > >> producer. We
> > > > >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call
> > the
> > > > >> handler
> > > > >>>>>> and
> > > > >>>>>>>> let it decide what to do next. But if it does return
> > `CONTINUE`
> > > to
> > > > >>>>>> swallow
> > > > >>>>>>>> the error and drop the poison pill record on the floor, we
> > would
> > > > >> want to
> > > > >>>>>>>> move forward and commit the transaction.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> But the question is: if we cannot add a record to the tx,
> > does
> > > > the
> > > > >>>>>>>> producer need to go into error state? In the end, we did
> throw
> > > and
> > > > >>>>>> inform
> > > > >>>>>>>> the app that the record was _not_ added, and it's up to the
> > app
> > > to
> > > > >>>>>> decide
> > > > >>>>>>>> what to do next?
> > > > >>>>>>>>>
> > > > >>>>>>>>> That’s an excellent question…
> > > > >>>>>>>>>
> > > > >>>>>>>>> Imagine the user’s application is writing information to a
> > > > database
> > > > >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column
> and
> > > > this
> > > > >> SQL
> > > > >>>>>>>> statement was attempted, what should happen?
> > > > >>>>>>>>>
> > > > >>>>>>>>>     INSERT INTO foo VALUES (’not sure’);
> > > > >>>>>>>>>
> > > > >>>>>>>>> Yes, that DML would fail, sure, but would the user expect
> > that
> > > > the
> > > > >>>>>>>> connection used by database library would get stuck in some
> > kind
> > > > of
> > > > >>>>>> error
> > > > >>>>>>>> state? A user would be able catch the error and either
> > continue
> > > or
> > > > >>>>>> abort,
> > > > >>>>>>>> based on their business rules.
> > > > >>>>>>>>>
> > > > >>>>>>>>> So I agree with what I believe you’re implying: we
> shouldn’t
> > > > >> poison the
> > > > >>>>>>>> Producer/TransactionManager on certain types of
> > > application-level
> > > > >>>>>> errors in
> > > > >>>>>>>> send().
> > > > >>>>>>>>>
> > > > >>>>>>>>> Kirk
> > > > >>>>>>>>>
> > > > >>>>>>>>>> If we report the error only via the `Callback` it's a
> > > different
> > > > >> story,
> > > > >>>>>>>> because the contract for this case is clearly specified on
> the
> > > > >> JavaDocs:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> When used as part of a transaction, it is not necessary
> to
> > > > >> define a
> > > > >>>>>>>> callback or check the result of the future
> > > > >>>>>>>>>>> in order to detect errors from <code>send</code>. If any
> of
> > > the
> > > > >> send
> > > > >>>>>>>> calls failed with an irrecoverable error,
> > > > >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and
> > > throw
> > > > >> the
> > > > >>>>>>>> exception from the last failed send. When
> > > > >>>>>>>>>>> this happens, your application should call {@link
> > > > >>>>>> #abortTransaction()}
> > > > >>>>>>>> to reset the state and continue to send
> > > > >>>>>>>>>>> data.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Matthias
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
> > > > >>>>>>>>>>> Hi Artem,
> > > > >>>>>>>>>>> I think it'd make sense to throw directly from send
> > whenever
> > > > >>>>>> possible,
> > > > >>>>>>>>>>> instead of returning an already-completed future. I
> didn't
> > do
> > > > >> that in
> > > > >>>>>>>> my
> > > > >>>>>>>>>>> bug fix to try to be conservative about breaking changes
> > but
> > > > this
> > > > >>>>>>>> seems to
> > > > >>>>>>>>>>> have caused its own set of headaches. It would be a
> little
> > > less
> > > > >>>>>>>> flexible
> > > > >>>>>>>>>>> though, since (as you note) it would still be impossible
> to
> > > > >> commit
> > > > >>>>>>>>>>> transactions after errors have been reported from
> brokers.
> > > > >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if
> > that
> > > > >>>>>>>> flexibility
> > > > >>>>>>>>>>> is required. If it is, then users could explicitly call
> > > flush()
> > > > >>>>>> before
> > > > >>>>>>>>>>> committing (and ignoring errors for) or aborting a
> > > transaction,
> > > > >> if
> > > > >>>>>> they
> > > > >>>>>>>>>>> want to implement fine-grained error handling logic such
> as
> > > > >> allowing
> > > > >>>>>>>> errors
> > > > >>>>>>>>>>> for a subset of topics to be ignored.
> > > > >>>>>>>>>>> Hi Matthias,
> > > > >>>>>>>>>>> Most of the time you're right and we can't throw from
> > send();
> > > > >>>>>> however,
> > > > >>>>>>>> in
> > > > >>>>>>>>>>> this case (client-side record-too-large exception), the
> > error
> > > > is
> > > > >>>>>>>> actually
> > > > >>>>>>>>>>> noticed by the producer before send() returns, so it
> should
> > > be
> > > > >>>>>>>> possible to
> > > > >>>>>>>>>>> throw directly.
> > > > >>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>> Chris
> > > > >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <
> > > mj...@apache.org>
> > > > >>>>>> wrote:
> > > > >>>>>>>>>>>> Not sure if we can change send and make it throw, given
> > that
> > > > >> send()
> > > > >>>>>> is
> > > > >>>>>>>>>>>> async? That is why users can register a `Callback` to
> > begin
> > > > >> with,
> > > > >>>>>>>> right?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> And Alieh's point about backward compatibility is also a
> > > fair
> > > > >>>>>> concern.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Actually, this would potentially be even
> > > > >>>>>>>>>>>>> worse than the original buggy behavior because the bug
> > was
> > > > >> that we
> > > > >>>>>>>>>>>> ignored
> > > > >>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> > > > >> necessarily
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> ones that we got from the broker.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would
> > > only
> > > > >>>>>> swallow
> > > > >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I
> > > agree
> > > > >> that
> > > > >>>>>> it
> > > > >>>>>>>>>>>> would be very bad to swallow errors about the actual tx
> > > > >> commit...
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> It's a fair question if this might be too subtle; to
> make
> > it
> > > > >>>>>> explicit,
> > > > >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()`
> > > [working
> > > > >> name]
> > > > >>>>>> to
> > > > >>>>>>>>>>>> make it clear.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If we think it's too subtle to change commit to swallow
> > > send()
> > > > >>>>>> errors,
> > > > >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer
> > (and
> > > > we
> > > > >> can
> > > > >>>>>>>> use
> > > > >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()`
> [working
> > > > name]
> > > > >> to
> > > > >>>>>> be
> > > > >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an
> > effect
> > > > for
> > > > >>>>>> TX).
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thoughts?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> > > > >>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> It is very exciting to see all the experts here raising
> > > very
> > > > >> good
> > > > >>>>>>>> points.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> As we go further, we see more and more options to
> improve
> > > our
> > > > >>>>>>>> solution,
> > > > >>>>>>>>>>>>> which makes concluding and updating the KIP impossible.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> The main suggestions so far are:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3. `send` must throw the exception
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> My concern about the 3rd suggestion:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1. Does the change cause any issue with backward
> > > > compatibility?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 2. The `send (bad record)` already transits the
> > transaction
> > > > to
> > > > >> the
> > > > >>>>>>>> error
> > > > >>>>>>>>>>>>> state. No user, including Streams is able to transit
> the
> > > > >>>>>> transaction
> > > > >>>>>>>> back
> > > > >>>>>>>>>>>>> from the error state. Do you mean we remove the
> > > > >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
> > > > >>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> as well?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>> Alieh
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> > > > >>>>>>>>>>>> andrew_schofi...@live.com>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi Artem,
> > > > >>>>>>>>>>>>>> I think you make a good point which is worth further
> > > > >>>>>> consideration.
> > > > >>>>>>>> If
> > > > >>>>>>>>>>>>>> any of the existing methods is really ripe for a
> change
> > > > here,
> > > > >> it’s
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>> send() that actually caused the problem. If that can
> be
> > > > fixed
> > > > >> so
> > > > >>>>>>>> there
> > > > >>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>> no situations in which a lurking error breaks a
> > > transaction,
> > > > >> that
> > > > >>>>>>>> might
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>> the best.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>> Andrew
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <
> > > > >> alivsh...@confluent.io
> > > > >>>>>>>>>>>> .INVALID>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I thought we still wait for requests (and their
> > errors)
> > > to
> > > > >> come
> > > > >>>>>>>> in and
> > > > >>>>>>>>>>>>>>> could handle fatal errors appropriately.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that
> > > when
> > > > >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to
> > ignore
> > > > >> errors.
> > > > >>>>>>>> So
> > > > >>>>>>>>>>>> if
> > > > >>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>> do
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 1. send
> > > > >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> the commit will succeed.  You can look at the example
> > in
> > > > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and
> > > just
> > > > >>>>>>>> substitute
> > > > >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send
> > > > >> errors")
> > > > >>>>>> and
> > > > >>>>>>>> we
> > > > >>>>>>>>>>>> get
> > > > >>>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would
> > > > >> potentially be
> > > > >>>>>>>> even
> > > > >>>>>>>>>>>>>>> worse than the original buggy behavior because the
> bug
> > > was
> > > > >> that
> > > > >>>>>> we
> > > > >>>>>>>>>>>>>> ignored
> > > > >>>>>>>>>>>>>>> errors that happened in the "send()" method itself,
> not
> > > > >>>>>>>> necessarily the
> > > > >>>>>>>>>>>>>>> ones that we got from the broker.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Actually, looking at
> > > > >>>>>>>> https://github.com/apache/kafka/pull/11508/files,
> > > > >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error
> > > from
> > > > >> the
> > > > >>>>>>>> "send"
> > > > >>>>>>>>>>>>>>> method itself, rather than trying to set it to be
> > thrown
> > > > >> during
> > > > >>>>>>>> commit?
> > > > >>>>>>>>>>>>>>> This way the example in
> > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> > > > >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an
> > > > >> opportunity
> > > > >>>>>>>> for
> > > > >>>>>>>>>>>> KS
> > > > >>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if
> > we
> > > > >> need a
> > > > >>>>>>>> KIP for
> > > > >>>>>>>>>>>>>>> that, just do a better fix of the old bug.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> -Artem
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> > > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here
> > > looks
> > > > >>>>>>>> reasonable.
> > > > >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes
> sense
> > > to
> > > > >> me and
> > > > >>>>>>>> makes
> > > > >>>>>>>>>>>>>> me
> > > > >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in
> the
> > > > >> methods
> > > > >>>>>>>> relevant
> > > > >>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>> the issue.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
> > > > >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> > > without
> > > > >>>>>> explicit
> > > > >>>>>>>>>>>>>> flush,
> > > > >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> > > application
> > > > >> won't
> > > > >>>>>> be
> > > > >>>>>>>>>>>> able
> > > > >>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Is this with respect to the case a request is still
> > > > inflight
> > > > >>>>>> when
> > > > >>>>>>>> we
> > > > >>>>>>>>>>>>>> call
> > > > >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for
> > requests
> > > > (and
> > > > >>>>>> their
> > > > >>>>>>>>>>>>>> errors)
> > > > >>>>>>>>>>>>>>>> to come in and could handle fatal errors
> > appropriately.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Justine
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> > > > >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or
> similar
> > > > could
> > > > >> be a
> > > > >>>>>>>> good
> > > > >>>>>>>>>>>>>>>> way
> > > > >>>>>>>>>>>>>>>>> forward?
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> I like this approach.  One minor concern is that if
> > we
> > > > set
> > > > >>>>>>>> "ignore
> > > > >>>>>>>>>>>> send
> > > > >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> > > without
> > > > >>>>>>>> explicit
> > > > >>>>>>>>>>>>>> flush,
> > > > >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> > > application
> > > > >> won't
> > > > >>>>>>>> be
> > > > >>>>>>>>>>>> able
> > > > >>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> > > But
> > > > I
> > > > >>>>>> guess
> > > > >>>>>>>>>>>> we'll
> > > > >>>>>>>>>>>>>>>> just
> > > > >>>>>>>>>>>>>>>>> have to clearly document it.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to
> > > optionally
> > > > >>>>>> restore
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> > bug,
> > > > >> which is
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> -Artem
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
> > > > >>>>>>>> mj...@apache.org>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a
> lot
> > of
> > > > >>>>>> support.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new
> > > > >> method". I
> > > > >>>>>>>> think
> > > > >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()`
> but
> > > > rather
> > > > >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one;
> > for
> > > > >> non-tx
> > > > >>>>>>>> case,
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> different flush variants would not make sense.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some
> "options"
> > > > >> object, so
> > > > >>>>>>>> maybe
> > > > >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar
> could
> > > be a
> > > > >> good
> > > > >>>>>>>> way
> > > > >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean`
> > parameter,
> > > > >>>>>>>> aesthetically,
> > > > >>>>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter,
> > we
> > > > >> might
> > > > >>>>>> not
> > > > >>>>>>>> even
> > > > >>>>>>>>>>>>>>>>>> need to deprecate the existing
> `commitTransaction()`
> > > > >> method?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> > > > >>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which
> changes
> > > the
> > > > >>>>>>>> behaviour
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs.
> > > But I
> > > > >>>>>> totally
> > > > >>>>>>>>>>>>>>>>>> understand
> > > > >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some
> of
> > > the
> > > > >> other
> > > > >>>>>>>>>>>>>>>>> suggestions
> > > > >>>>>>>>>>>>>>>>>>> in this thread seem neater.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Personally, I would add another method to
> > > > KafkaProducer.
> > > > >> Not
> > > > >>>>>> an
> > > > >>>>>>>>>>>>>>>>> overload
> > > > >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all.
> > Using
> > > > >>>>>> Matthias’s
> > > > >>>>>>>>>>>>>>>>> options,
> > > > >>>>>>>>>>>>>>>>>>> I prefer (3).
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>>>>>>> Andrew
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <
> > > > liane...@gmail.com
> > > > >>>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the
> > > config
> > > > >> not
> > > > >>>>>>>> being
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> most
> > > > >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this
> capability.
> > > > >> Getting
> > > > >>>>>>>> then to
> > > > >>>>>>>>>>>>>>>>>> Matthias
> > > > >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is
> that
> > > it
> > > > >> seems
> > > > >>>>>>>> they
> > > > >>>>>>>>>>>>>>>> might
> > > > >>>>>>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting
> some
> > > > other
> > > > >>>>>> twist
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> flush
> > > > >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another
> > param
> > > > to
> > > > >> it,
> > > > >>>>>> or
> > > > >>>>>>>>>>>>>>>> another
> > > > >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the
> context
> > to
> > > > >> answer
> > > > >>>>>>>> that,
> > > > >>>>>>>>>>>>>>>> but
> > > > >>>>>>>>>>>>>>>>>> if it
> > > > >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some
> > kind
> > > > >>>>>>>> FlushOptions
> > > > >>>>>>>>>>>>>>>>>> params to
> > > > >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility
> > > point
> > > > of
> > > > >>>>>>>> view. It
> > > > >>>>>>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for
> now
> > > but
> > > > >> could
> > > > >>>>>>>>>>>> accept
> > > > >>>>>>>>>>>>>>>>> any
> > > > >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove
> > the
> > > > >>>>>>>> "ugliness"
> > > > >>>>>>>>>>>>>>>>> Matthias
> > > > >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the
> > > different
> > > > >>>>>>>> semantics
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>>> flush,
> > > > >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush
> and
> > > > >>>>>>>>>>>> commitTransaction
> > > > >>>>>>>>>>>>>>>>>> java
> > > > >>>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears
> the
> > > last
> > > > >>>>>>>> exception"
> > > > >>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called
> after
> > > > >> flush,
> > > > >>>>>> but
> > > > >>>>>>>> it
> > > > >>>>>>>>>>>>>>>>> really
> > > > >>>>>>>>>>>>>>>>>> all
> > > > >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an
> > > example
> > > > to
> > > > >>>>>> show
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>>>>> flow
> > > > >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great
> > gain
> > > > >> here):
> > > > >>>>>>>> flush
> > > > >>>>>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>>>>> clear
> > > > >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever
> > error
> > > > >> handling
> > > > >>>>>>>> we
> > > > >>>>>>>>>>>> want
> > > > >>>>>>>>>>>>>>>>> ->
> > > > >>>>>>>>>>>>>>>>>>>> commitTransaction successfully
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Thanks!
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Lianet
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
> > > > >>>>>>>>>>>>>>>>> fearthecel...@gmail.com
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more
> > > that
> > > > >> might
> > > > >>>>>>>> help
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>> if,
> > > > >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
> > > > >>>>>>>> commitTransaction()
> > > > >>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
> > > > >>>>>>>> tolerateRecordErrors).
> > > > >>>>>>>>>>>>>>>> This
> > > > >>>>>>>>>>>>>>>>>> seems
> > > > >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the
> behavioral
> > > > >> change we
> > > > >>>>>>>> want,
> > > > >>>>>>>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>>>>> only
> > > > >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API
> > > method
> > > > >> that
> > > > >>>>>> is
> > > > >>>>>>>> only
> > > > >>>>>>>>>>>>>>>>> used
> > > > >>>>>>>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid
> the
> > > > issue
> > > > >> of
> > > > >>>>>>>> whether
> > > > >>>>>>>>>>>>>>>> or
> > > > >>>>>>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered
> > > > semantics)
> > > > >>>>>>>> should
> > > > >>>>>>>>>>>>>>>> throw
> > > > >>>>>>>>>>>>>>>>> or
> > > > >>>>>>>>>>>>>>>>>>>>> not. Thoughts?
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot
> > > more
> > > > >> than
> > > > >>>>>> the
> > > > >>>>>>>>>>>>>>>>> pluggable
> > > > >>>>>>>>>>>>>>>>>>>>> handler!
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this
> > > behavior
> > > > >> via
> > > > >>>>>>>>>>>>>>>>> configuration
> > > > >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that
> > > > >> application
> > > > >>>>>>>> code
> > > > >>>>>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one
> type
> > of
> > > > >>>>>> behavior
> > > > >>>>>>>> from
> > > > >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that
> > > > application
> > > > >> code
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>>>>>>> declare
> > > > >>>>>>>>>>>>>>>>>> its
> > > > >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer
> > seems
> > > > >> more
> > > > >>>>>>>>>>>>>>>> appropriate
> > > > >>>>>>>>>>>>>>>>>> than,
> > > > >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application
> > to
> > > > >> tweak a
> > > > >>>>>>>>>>>>>>>>>> configuration
> > > > >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside
> > it.
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Cheers,
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Chris
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J.
> Sax
> > <
> > > > >>>>>>>>>>>> mj...@apache.org
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the
> > KIP
> > > > >> as-is,
> > > > >>>>>> but
> > > > >>>>>>>>>>>> think
> > > > >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move
> > forward?
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>   1. add config to allow "silent error
> > clearance"
> > > as
> > > > >> the
> > > > >>>>>>>> KIP
> > > > >>>>>>>>>>>>>>>>> proposes
> > > > >>>>>>>>>>>>>>>>>>>>>>   2. change flush() to clear error and let it
> > > throw
> > > > >>>>>>>>>>>>>>>>>>>>>>   3. add new flushAndThrow()` (or better name)
> > > which
> > > > >>>>>> clears
> > > > >>>>>>>>>>>> error
> > > > >>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>>>> throws
> > > > >>>>>>>>>>>>>>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and
> let
> > > user
> > > > >> pick
> > > > >>>>>>>> (and
> > > > >>>>>>>>>>>>>>>>>> deprecate
> > > > >>>>>>>>>>>>>>>>>>>>>> existing `flush()`)
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior
> > change,
> > > > we
> > > > >>>>>> might
> > > > >>>>>>>> also
> > > > >>>>>>>>>>>>>>>>> need
> > > > >>>>>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue
> Artem
> > > > >> mentioned.
> > > > >>>>>>>> (3)
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>> (4)
> > > > >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both
> we
> > > > >> kinda get
> > > > >>>>>>>> an
> > > > >>>>>>>>>>>> ugly
> > > > >>>>>>>>>>>>>>>>>> API?
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference.
> > Seems
> > > > we
> > > > >> need
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>> pick
> > > > >>>>>>>>>>>>>>>>>> some
> > > > >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution?
> > > Would
> > > > >> be
> > > > >>>>>>>> good to
> > > > >>>>>>>>>>>>>>>> her
> > > > >>>>>>>>>>>>>>>>>> from
> > > > >>>>>>>>>>>>>>>>>>>>>> others what they think
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of
> > > > >> suggestions:
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush
> after
> > > we
> > > > >> clear
> > > > >>>>>>>> it.
> > > > >>>>>>>>>>>>>>>> This
> > > > >>>>>>>>>>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and
> "send
> > +
> > > > >> flush +
> > > > >>>>>>>>>>>> commit"
> > > > >>>>>>>>>>>>>>>>> (the
> > > > >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to
> > > > express
> > > > >> the
> > > > >>>>>>>>>>>> former,
> > > > >>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same)
> > would
> > > > >> throw if
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>> transaction
> > > > >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written
> either
> > > way
> > > > >> it's
> > > > >>>>>>>> going
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>>>>>> correct).
> > > > >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be
> extended
> > by
> > > > the
> > > > >>>>>>>> caller to
> > > > >>>>>>>>>>>>>>>>>>>>> intercept
> > > > >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and
> > > commit
> > > > >> the
> > > > >>>>>>>>>>>>>>>>> transaction.
> > > > >>>>>>>>>>>>>>>>>>>>>> This
> > > > >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if
> > > someone
> > > > >> has
> > > > >>>>>>>> code
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>>> doesn't
> > > > >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic
> > > "send +
> > > > >>>>>> flush +
> > > > >>>>>>>>>>>>>>>> commit"
> > > > >>>>>>>>>>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things
> > possible,
> > > > an
> > > > >>>>>>>>>>>> application
> > > > >>>>>>>>>>>>>>>>> can
> > > > >>>>>>>>>>>>>>>>>>>>> add
> > > > >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some
> > errors.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way
> to
> > > > >> express
> > > > >>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>> modification
> > > > >>>>>>>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application
> logic
> > > that
> > > > >> calls
> > > > >>>>>>>>>>>> "flush"
> > > > >>>>>>>>>>>>>>>>>> needs
> > > > >>>>>>>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring
> > > > >> semantics in
> > > > >>>>>> a
> > > > >>>>>>>>>>>>>>>> detached
> > > > >>>>>>>>>>>>>>>>>>>>> place
> > > > >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.
> > > This
> > > > >> can
> > > > >>>>>> be
> > > > >>>>>>>>>>>>>>>>> especially
> > > > >>>>>>>>>>>>>>>>>>>>> bad
> > > > >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a
> file
> > > at
> > > > >> run
> > > > >>>>>>>> time, in
> > > > >>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>>>> case a
> > > > >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the
> > > > application
> > > > >>>>>>>> because it
> > > > >>>>>>>>>>>>>>>> was
> > > > >>>>>>>>>>>>>>>>>>>>>> written
> > > > >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the
> > semantics
> > > > is
> > > > >>>>>>>> switched.
> > > > >>>>>>>>>>>>>>>>> Given
> > > > >>>>>>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the
> > caller's
> > > > >>>>>>>> expectation,
> > > > >>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>> way
> > > > >>>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's
> > > > >> expectation
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>> "flush"
> > > > >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different
> > > name
> > > > or
> > > > >>>>>> have
> > > > >>>>>>>> an
> > > > >>>>>>>>>>>>>>>>>> overload
> > > > >>>>>>>>>>>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the
> > semantics
> > > > (the
> > > > >>>>>>>> current
> > > > >>>>>>>>>>>>>>>>> method
> > > > >>>>>>>>>>>>>>>>>>>>>> could
> > > > >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> -Artem
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
> > > > >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> > > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for
> KIP-1059
> > > > that
> > > > >>>>>>>> suggests
> > > > >>>>>>>>>>>>>>>>> adding
> > > > >>>>>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>
> > > >
> > >
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
> >
>

Reply via email to