Hi Artem,

Yes, I completely agree that by default, special action shouldn't be
required from users to prevent transactions from being committed when one
or more records can't be sent. The behavior I was suggesting was only
relevant to the new API we're discussing where we allow users to
intentionally bypass that logic when invoking commitTransaction.

Cheers,

Chris

On Tue, Jun 25, 2024, 01:44 Artem Livshits <alivsh...@confluent.io.invalid>
wrote:

> Hey folks,
>
> Great discussion!
>
> Re: throwing exceptions from send().  send() is documented to throw
> KafkaException, so if the application doesn't handle it, it should be a
> bug.  Now, it does have a note that API exceptions wouldn't be thrown, not
> sure if we have code that relies on that.  There is a reason exceptions
> have classes, they are designed to express a "class of errors" that can be
> handled, so that we don't have to add a flag or a new method every time we
> have a new exception to throw.  But if there is consensus that it's still
> too risky (especially if we have examples of code that gets broken), then I
> agree that we shouldn't do it.
>
> Re: various ways to communicate semantics change.  If we must have 2
> different behaviors, I think passing options to "ignore errors" to
> commitTransaction is probably the most intuitive way to do it.  What I
> really don't like (in any of the options), is that we cannot really
> document it in a way that articulates a value in the product.  There are
> tons of nuances that require understanding some buggy behavior, then fixed
> behavior, then an option to sometimes turn on buggy behavior, and etc.
>
> >  if a user invokes Producer::abortTransaction from within a producer
> callback today
>
> I think we would get invalid state exception.  Which we could probably fix,
> but even if we supported it, I think it would be good if doing send +
> commit would lead to aborted transaction without special action from the
> application -- the simple things should be really simple, any failure
> during send or commit should abort send + commit sequence without special
> handling.
>
> -Artem
>
> On Mon, Jun 24, 2024 at 6:37 PM Chris Egerton <fearthecel...@gmail.com>
> wrote:
>
> > One quick thought: if a user invokes Producer::abortTransaction from
> within
> > a producer callback today, even in the midst of an ongoing call to
> > Producer::commitTransaction, what is the behavior? Would it be reasonable
> > to support this behavior as a way to allow error handling to take place
> > during implicit flushes, via producer callback?
> >
> > On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > My point it, that it does not seem to be safe to allow users to ignore
> > > errors with an implicit flush, and I think it's better to only allow it
> > > with (ie, after) an explicit flush().
> > >
> > > My reasoning is, that users should make a decision to ignore errors or
> > > not, before calling `commitTx()`, but after inspecting all potential
> > > send errors. With an implicit flush, users need to "blindly" decide to
> > > ignore send errors, because there are pending sends and potential
> errors
> > > are not known yet, when calling `commitTx()`.
> > >
> > >
> > >
> > > > In the documentation of commitTransaction, we say if any send throws
> an
> > > > error, commitTransaction will too.
> > >
> > > Yes. And I think we should keep it this way for an implicit flush. With
> > > an explicit flush, `commitTransaction()` cannot encounter any send
> > > errors any longer.
> > >
> > >
> > >
> > > > It says that all callbacks will be executed, but we ignore the errors
> > of
> > > > the callbacks.
> > >
> > > Ah. Thanks for pointing this out. For this case it's even worse (for
> > > case (2)), because the user cannot inspect any errors and make any
> > > decision to ignore or not during an implicit flush...
> > >
> > >
> > >
> > > > We shouldn't be relying on errors in the callback unless we are
> > > > calling flush, which we can still do. It seems this has always been
> the
> > > > case as well.
> > >
> > > Yes, has always been this way, and my point is to keep it this way
> > > (option (2) would change it), and not start to allow to ignore errors
> > > with an implicit flush.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 6/24/24 4:57 PM, Justine Olshan wrote:
> > > > Transaction verification is a concept from KIP-890 referring to the
> > > > verification that a partition has been added to the transaction. It's
> > > not a
> > > > huge deal, but maybe we don't want to overload the terminology.
> > > >
> > > > For option 2, I was a little confused by this
> > > >
> > > >>    when commitTx is called, there is still pending Futures and not
> > > > all Callbacks are executed yet -- with the implicit flush, we know
> that
> > > > all Callbacks are executed, but even for this case, the user could
> only
> > > > throw an exception inside the Callback to stop the TX to eventually
> > > > commit -- Futures cannot be used to make a decision to ignore error
> and
> > > > commit or not.
> > > >
> > > > In the documentation of commitTransaction, we say if any send throws
> an
> > > > error, commitTransaction will too.
> > > >
> > > > *Further, if any of the {@link #send(ProducerRecord)} calls which
> were
> > > part
> > > > of the transaction hit irrecoverable errors, this method will throw
> the
> > > > last received exception immediately and the transaction will not be
> > > > committed.*
> > > >
> > > > It says that all callbacks will be executed, but we ignore the errors
> > of
> > > > the callbacks.
> > > >
> > > > *If the transaction is committed successfully and this method returns
> > > > without throwing an exception, it is guaranteed that all {@link
> > Callback
> > > > callbacks} for records in the transaction will have been invoked and
> > > > completed. Note that exceptions thrown by callbacks are ignored; the
> > > > producer proceeds to commit the transaction in any case.*
> > > >
> > > > Is it fair to say though that for the send errors, we can choose to
> > > ignore
> > > > them? II wasn't understanding where the callbacks come in with your
> > > > comment. We shouldn't be relying on errors in the callback unless we
> > are
> > > > calling flush, which we can still do. It seems this has always been
> the
> > > > case as well.
> > > >
> > > > Justine
> > > >
> > > > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
> > > andrew_schofi...@live.com>
> > > > wrote:
> > > >
> > > >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with
> > 3a
> > > as
> > > >> the way.
> > > >>
> > > >> I suggest “TRANSACTION VERIFIED”.
> > > >>
> > > >> There isn’t really precedent for options in the producer API. We
> could
> > > use
> > > >> an enum,
> > > >> which is easy to use and not very future-proof. Or we could use a
> > class
> > > >> like the
> > > >> admin API does, which is cumbersome and flexible.
> > > >>
> > > >>    CommitTransactionOptions.TRANSACTION_VERIFIED
> > > >>
> > > >> or
> > > >>
> > > >>    public class CommitTransactionOptions {
> > > >>      public CommitTransactionOptions();
> > > >>
> > > >>      CommitTransactionOptions transactionVerified(boolean
> > > >> transactionVerified);
> > > >>
> > > >>      boolean transactionVerified();
> > > >>    }
> > > >>
> > > >>
> > > >> Then 3b is:
> > > >>
> > > >>     send(…)
> > > >>     send(…)
> > > >>     flush()
> > > >>     commitTransaction(new
> > > >> CommitTransactionOptions().transactionVerified(true))
> > > >>
> > > >>
> > > >> I’d tend towards the enum here because I doubt we need as much
> > > flexibility
> > > >> as the admin API requires.
> > > >>
> > > >> Thanks,
> > > >> Andrew
> > > >>
> > > >>
> > > >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org>
> wrote:
> > > >>>
> > > >>> I am ok either way (ie, flush or commit), but I think we need to
> > define
> > > >> exact semantics, and I think there is some subtle thing to consider:
> > > >>>
> > > >>>
> > > >>>
> > > >>> 1) flush(Options)
> > > >>>
> > > >>> Example:
> > > >>>
> > > >>>   send(...)
> > > >>>   send(...)
> > > >>>
> > > >>>   flush(ignoreErrors)
> > > >>>
> > > >>>   // at this point, we know that all Futures are completed and all
> > > >> Callbacks are executed, and we can assume that all user code
> checking
> > > for
> > > >> errors did execute, before `commitTx` is called
> > > >>>
> > > >>>   // I consider this option as safe
> > > >>>
> > > >>>   commitTx()
> > > >>>
> > > >>>
> > > >>> 2) commitTx(Option)
> > > >>>
> > > >>> Example:
> > > >>>
> > > >>>   send(...)
> > > >>>   send(...)
> > > >>>
> > > >>>   // when commitTx is called, there is still pending Futures and
> not
> > > all
> > > >> Callbacks are executed yet -- with the implicit flush, we know that
> > all
> > > >> Callbacks are executed, but even for this case, the user could only
> > > throw
> > > >> an exception inside the Callback to stop the TX to eventually commit
> > --
> > > >> Futures cannot be used to make a decision to ignore error and commit
> > or
> > > not.
> > > >>>
> > > >>>   // I consider this option not as safe
> > > >>>
> > > >>>   commitTx(igrnoreErrors)
> > > >>>
> > > >>>
> > > >>>
> > > >>> 3a) required flush + commitTx(Option)
> > > >>>
> > > >>> Example:
> > > >>>
> > > >>>   send(...)
> > > >>>   send(...)
> > > >>>
> > > >>>   flush()
> > > >>>
> > > >>>   // at this point, we know that all Future are completed and all
> > > >> Callbacks are executed, and we can assume that all user code
> checking
> > > for
> > > >> error did execute, before `commitTx` is called
> > > >>>
> > > >>>   // I consider this option as safe
> > > >>>
> > > >>>   commitTx(ignoreErrors)
> > > >>>
> > > >>>
> > > >>> 3b) missing flush + commitTx(Option)
> > > >>>
> > > >>> Example:
> > > >>>
> > > >>>   send(...)
> > > >>>   send(...)
> > > >>>
> > > >>>   // as flush() was not called explicitly, we should ignore
> > > >> `ignoreErrors` flag and always throw an exception if the producer is
> > in
> > > >> error state, because we cannot be sure that the user did all
> required
> > > check
> > > >> for error handling
> > > >>>
> > > >>>   commitTx(ignoreErrors)
> > > >>>
> > > >>>
> > > >>>
> > > >>> The only issue with option (3) is, that it's more complex and
> > semantics
> > > >> are more subtle. But it might be the a good (necessary?) bridge
> > between
> > > (1)
> > > >> and (2): (3) is semantically sound (we ignore errors via passing a
> > flag
> > > >> into commitTx() instead of flush()), and at the same time safe (we
> > force
> > > >> users to explicitly flush() and [hopefully] do proper error
> handling,
> > > and
> > > >> don't rely in am implicit flush() during commitTx() which might be
> > error
> > > >> prone).
> > > >>>
> > > >>> (Also need to find a good and descriptive name for the flag we pass
> > > into
> > > >> `commitTx()` for this case.)
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>>
> > > >>> On 6/24/24 8:51 AM, Andrew Schofield wrote:
> > > >>>> Hi Chris,
> > > >>>> That works for me too. I slightly prefer an option on flush(), but
> > > what
> > > >> you suggested
> > > >>>> works too.
> > > >>>> Thanks,
> > > >>>> Andrew
> > > >>>>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID
> >
> > > >> wrote:
> > > >>>>>
> > > >>>>> Hi Andrew,
> > > >>>>>
> > > >>>>> I like a lot of what you said, but I still believe it's better to
> > > >> override
> > > >>>>> commitTransaction than flush. Users will already have to manually
> > opt
> > > >> in to
> > > >>>>> ignoring errors encountered during transactions, and we can
> > document
> > > >>>>> recommended usage (i.e., explicitly invoking flush() before
> > invoking
> > > >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced
> > > method.
> > > >> I
> > > >>>>> don't believe it's worth the increased cognitive load on users
> with
> > > >>>>> non-transactional producers to introduce an overloaded flush()
> > > variant.
> > > >>>>>
> > > >>>>> Cheers,
> > > >>>>>
> > > >>>>> Chris
> > > >>>>>
> > > >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
> > > >> andrew_schofi...@live.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi Alieh,
> > > >>>>>> Thanks for driving this. Unfortunately, there are many parts of
> > the
> > > >> API
> > > >>>>>> which
> > > >>>>>> are a bit unfortunate and it’s tricky to make small improvements
> > > that
> > > >>>>>> don’t have
> > > >>>>>> downsides.
> > > >>>>>>
> > > >>>>>> I don’t like the idea of using a configuration because
> > configuration
> > > >> is
> > > >>>>>> often
> > > >>>>>> outside the application and changing the behaviour of someone
> > else’s
> > > >>>>>> application
> > > >>>>>> without understanding it is risky. Anything which embeds a
> > > >> transactional
> > > >>>>>> producer
> > > >>>>>> could have its behaviour changed unexpectedly.
> > > >>>>>>
> > > >>>>>> It would be been much nicer if send() didn’t fail silently and
> > > change
> > > >> the
> > > >>>>>> transaction
> > > >>>>>> state. But, because it’s an asynchronous operation, I don’t
> really
> > > >> think
> > > >>>>>> we can
> > > >>>>>> just make it throw all exceptions, even though I really think
> that
> > > >>>>>> `send()` is the
> > > >>>>>> method with the problem here.
> > > >>>>>>
> > > >>>>>> The contract of `flush()` is that it makes sure that all
> preceding
> > > >> sends
> > > >>>>>> will have
> > > >>>>>> completed, so it should be true that a well written application
> > > would
> > > >> be
> > > >>>>>> able to
> > > >>>>>> know which records were OK because of the Future<RecordMetadata>
> > > >> returned
> > > >>>>>> by the `send()` method. It should be able to determine whether
> it
> > > >> wants to
> > > >>>>>> commit
> > > >>>>>> the transaction even if some of the intended operations didn’t
> > > >> succeed.
> > > >>>>>>
> > > >>>>>> What we don’t currently have is a way for the application to say
> > to
> > > >> the
> > > >>>>>> KafkaProducer
> > > >>>>>> that it knows the outcome of sending the records and to confirm
> > that
> > > >> it
> > > >>>>>> wants to proceed.
> > > >>>>>> Then it would not be necessary for `commitTransaction()` to
> throw
> > an
> > > >>>>>> exception to
> > > >>>>>> report a historical error which the application might choose to
> > > >> ignore.
> > > >>>>>>
> > > >>>>>> Having read the comments, I think the KIP is on the right lines
> > > >> focusing
> > > >>>>>> on the `flush()`
> > > >>>>>> method. My suggestion is that we introduce an option on
> `flush()`
> > to
> > > >> be
> > > >>>>>> used before
> > > >>>>>> `commitTransaction()` for applications that want to be able to
> > > commit
> > > >>>>>> transactions which
> > > >>>>>> had known failed operations.
> > > >>>>>>
> > > >>>>>> The code would be:
> > > >>>>>>
> > > >>>>>>    producer.beginTransaction();
> > > >>>>>>
> > > >>>>>>    future1 = producer.send(goodRecord1);
> > > >>>>>>    future2 = producer.send(badRecord); // The future from this
> > call
> > > >> will
> > > >>>>>> complete exceptionally
> > > >>>>>>    future3 = producer.send(goodRecord2);
> > > >>>>>>
> > > >>>>>>    producer.flush(FlushOption.TRANSACTION_READY);
> > > >>>>>>
> > > >>>>>>    // At this point, we know that all 3 futures are complete and
> > the
> > > >>>>>> transaction contains 2 records
> > > >>>>>>    producer.commitTransaction();
> > > >>>>>>
> > > >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses the
> > > >> default
> > > >>>>>> option which behaves
> > > >>>>>> like today.
> > > >>>>>>
> > > >>>>>> Why did I suggest an option on `flush()` rather than
> > > >>>>>> `commitTransaction()`? Because with
> > > >>>>>> `flush()`, it’s clear when the application is stating that it’s
> > seen
> > > >> all
> > > >>>>>> of the results from its
> > > >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on
> > > >> flushing
> > > >>>>>> that occurs inside
> > > >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
> > > >>>>>>
> > > >>>>>> Thanks,
> > > >>>>>> Andrew
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi
> > > <asae...@confluent.io.INVALID
> > > >>>
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>> Thanks for the interesting discussion.
> > > >>>>>>>
> > > >>>>>>> I assume that now the main questions are as follows:
> > > >>>>>>>
> > > >>>>>>> 1. Do we need to transit the transcation to the error state for
> > API
> > > >>>>>>> exceptions?
> > > >>>>>>> 2. Should we throw the API exception in `send()` instead of
> > > >> returning a
> > > >>>>>>> future error?
> > > >>>>>>> 3. If the answer to question (1) is NO and to question (2) is
> > YES,
> > > >> do we
> > > >>>>>>> need to change the current `flush` or `commitTnx` at all?
> > > >>>>>>>
> > > >>>>>>> Cheers,
> > > >>>>>>> Alieh
> > > >>>>>>>
> > > >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <
> > mj...@apache.org>
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hey Kirk,
> > > >>>>>>>>
> > > >>>>>>>> can you elaborate on a few points?
> > > >>>>>>>>
> > > >>>>>>>>> Otherwise users would have to know to explicitly change their
> > > code
> > > >> to
> > > >>>>>>>> invoke flush().
> > > >>>>>>>>
> > > >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the
> > > existing
> > > >>>>>>>> `flush()` w/o any option will still be there, right? If we
> would
> > > >> really
> > > >>>>>>>> deprecate existing `flush()`, it would just mean that we would
> > > pass
> > > >>>>>>>> "default FlushOption" into an implicit flush (and yes, we
> would
> > > >> need to
> > > >>>>>>>> define what this would be).
> > > >>>>>>>>
> > > >>>>>>>> I think there is no clear winner (as pointed out in my last
> > > reply),
> > > >> and
> > > >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has
> > > >> advantages
> > > >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff
> we
> > > >> want to
> > > >>>>>>>> move forward with?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the
> > > better
> > > >>>>>>>> comparison would be:
> > > >>>>>>>>
> > > >>>>>>>> BEGIN TX;
> > > >>>>>>>> INSERT INTO foo VALUES (’a’);
> > > >>>>>>>> INSERT INTO foo VALUES (’b’);
> > > >>>>>>>> INSERT INTO foo VALUES (’c’);
> > > >>>>>>>> INSERT INTO foo VALUES (’not sure’);
> > > >>>>>>>>
> > > >>>>>>>> For this case, the full TX would roll back, right? I still
> think
> > > >> that
> > > >>>>>>>> allowing users to just skip over the last error, and continue
> > the
> > > TX
> > > >>>>>>>> would be ok. In the end, we provide a programmatic API, and
> not
> > a
> > > >>>>>>>> declarative one as SQL. Of course, default behavior would
> still
> > be
> > > >> to
> > > >>>>>>>> put the producer into error state, and the user would need to
> > call
> > > >>>>>>>> `abortTransaction()` to move forward.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
> > > >>>>>>>>> Hi Matthias,
> > > >>>>>>>>>
> > > >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <
> > mj...@apache.org
> > > >
> > > >>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> If we want to limit it to `RecordTooLargeException` throwing
> > > from
> > > >>>>>>>> `send()` directly make sense. Thanks for calling it out.
> > > >>>>>>>>>>
> > > >>>>>>>>>> It's still a question of backward compatibility? `send()`
> does
> > > >> throw
> > > >>>>>>>> exceptions already, including generic `KafkaException`. Not
> sure
> > > if
> > > >> this
> > > >>>>>>>> helps with backward compatibility? Could we just add a new
> > > exception
> > > >>>>>> type
> > > >>>>>>>> (which is a child of `KafkaException`)?
> > > >>>>>>>>>>
> > > >>>>>>>>>> The Producer JavaDocs are not totally explicit about it
> IMHO.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I think we could expect that some generic error handling
> path
> > > gets
> > > >>>>>>>> executed. For the TX-case, I would assume that a TX would be
> > > >> aborted if
> > > >>>>>>>> `send()` throws or that the producer would be `closed()`.
> > Overall
> > > >> this
> > > >>>>>>>> might be safe?
> > > >>>>>>>>>>
> > > >>>>>>>>>>> It would be a little less flexible
> > > >>>>>>>>>>>> though, since (as you note) it would still be impossible
> to
> > > >> commit
> > > >>>>>>>>>>>> transactions after errors have been reported from brokers.
> > > >>>>>>>>>>
> > > >>>>>>>>>> KS would still need a way to clear the error state of the
> > > >> producer. We
> > > >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call
> the
> > > >> handler
> > > >>>>>> and
> > > >>>>>>>> let it decide what to do next. But if it does return
> `CONTINUE`
> > to
> > > >>>>>> swallow
> > > >>>>>>>> the error and drop the poison pill record on the floor, we
> would
> > > >> want to
> > > >>>>>>>> move forward and commit the transaction.
> > > >>>>>>>>>>
> > > >>>>>>>>>> But the question is: if we cannot add a record to the tx,
> does
> > > the
> > > >>>>>>>> producer need to go into error state? In the end, we did throw
> > and
> > > >>>>>> inform
> > > >>>>>>>> the app that the record was _not_ added, and it's up to the
> app
> > to
> > > >>>>>> decide
> > > >>>>>>>> what to do next?
> > > >>>>>>>>>
> > > >>>>>>>>> That’s an excellent question…
> > > >>>>>>>>>
> > > >>>>>>>>> Imagine the user’s application is writing information to a
> > > database
> > > >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and
> > > this
> > > >> SQL
> > > >>>>>>>> statement was attempted, what should happen?
> > > >>>>>>>>>
> > > >>>>>>>>>     INSERT INTO foo VALUES (’not sure’);
> > > >>>>>>>>>
> > > >>>>>>>>> Yes, that DML would fail, sure, but would the user expect
> that
> > > the
> > > >>>>>>>> connection used by database library would get stuck in some
> kind
> > > of
> > > >>>>>> error
> > > >>>>>>>> state? A user would be able catch the error and either
> continue
> > or
> > > >>>>>> abort,
> > > >>>>>>>> based on their business rules.
> > > >>>>>>>>>
> > > >>>>>>>>> So I agree with what I believe you’re implying: we shouldn’t
> > > >> poison the
> > > >>>>>>>> Producer/TransactionManager on certain types of
> > application-level
> > > >>>>>> errors in
> > > >>>>>>>> send().
> > > >>>>>>>>>
> > > >>>>>>>>> Kirk
> > > >>>>>>>>>
> > > >>>>>>>>>> If we report the error only via the `Callback` it's a
> > different
> > > >> story,
> > > >>>>>>>> because the contract for this case is clearly specified on the
> > > >> JavaDocs:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> When used as part of a transaction, it is not necessary to
> > > >> define a
> > > >>>>>>>> callback or check the result of the future
> > > >>>>>>>>>>> in order to detect errors from <code>send</code>. If any of
> > the
> > > >> send
> > > >>>>>>>> calls failed with an irrecoverable error,
> > > >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and
> > throw
> > > >> the
> > > >>>>>>>> exception from the last failed send. When
> > > >>>>>>>>>>> this happens, your application should call {@link
> > > >>>>>> #abortTransaction()}
> > > >>>>>>>> to reset the state and continue to send
> > > >>>>>>>>>>> data.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Matthias
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
> > > >>>>>>>>>>> Hi Artem,
> > > >>>>>>>>>>> I think it'd make sense to throw directly from send
> whenever
> > > >>>>>> possible,
> > > >>>>>>>>>>> instead of returning an already-completed future. I didn't
> do
> > > >> that in
> > > >>>>>>>> my
> > > >>>>>>>>>>> bug fix to try to be conservative about breaking changes
> but
> > > this
> > > >>>>>>>> seems to
> > > >>>>>>>>>>> have caused its own set of headaches. It would be a little
> > less
> > > >>>>>>>> flexible
> > > >>>>>>>>>>> though, since (as you note) it would still be impossible to
> > > >> commit
> > > >>>>>>>>>>> transactions after errors have been reported from brokers.
> > > >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if
> that
> > > >>>>>>>> flexibility
> > > >>>>>>>>>>> is required. If it is, then users could explicitly call
> > flush()
> > > >>>>>> before
> > > >>>>>>>>>>> committing (and ignoring errors for) or aborting a
> > transaction,
> > > >> if
> > > >>>>>> they
> > > >>>>>>>>>>> want to implement fine-grained error handling logic such as
> > > >> allowing
> > > >>>>>>>> errors
> > > >>>>>>>>>>> for a subset of topics to be ignored.
> > > >>>>>>>>>>> Hi Matthias,
> > > >>>>>>>>>>> Most of the time you're right and we can't throw from
> send();
> > > >>>>>> however,
> > > >>>>>>>> in
> > > >>>>>>>>>>> this case (client-side record-too-large exception), the
> error
> > > is
> > > >>>>>>>> actually
> > > >>>>>>>>>>> noticed by the producer before send() returns, so it should
> > be
> > > >>>>>>>> possible to
> > > >>>>>>>>>>> throw directly.
> > > >>>>>>>>>>> Cheers,
> > > >>>>>>>>>>> Chris
> > > >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <
> > mj...@apache.org>
> > > >>>>>> wrote:
> > > >>>>>>>>>>>> Not sure if we can change send and make it throw, given
> that
> > > >> send()
> > > >>>>>> is
> > > >>>>>>>>>>>> async? That is why users can register a `Callback` to
> begin
> > > >> with,
> > > >>>>>>>> right?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> And Alieh's point about backward compatibility is also a
> > fair
> > > >>>>>> concern.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Actually, this would potentially be even
> > > >>>>>>>>>>>>> worse than the original buggy behavior because the bug
> was
> > > >> that we
> > > >>>>>>>>>>>> ignored
> > > >>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> > > >> necessarily
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> ones that we got from the broker.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would
> > only
> > > >>>>>> swallow
> > > >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I
> > agree
> > > >> that
> > > >>>>>> it
> > > >>>>>>>>>>>> would be very bad to swallow errors about the actual tx
> > > >> commit...
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> It's a fair question if this might be too subtle; to make
> it
> > > >>>>>> explicit,
> > > >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()`
> > [working
> > > >> name]
> > > >>>>>> to
> > > >>>>>>>>>>>> make it clear.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> If we think it's too subtle to change commit to swallow
> > send()
> > > >>>>>> errors,
> > > >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer
> (and
> > > we
> > > >> can
> > > >>>>>>>> use
> > > >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working
> > > name]
> > > >> to
> > > >>>>>> be
> > > >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an
> effect
> > > for
> > > >>>>>> TX).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thoughts?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> > > >>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> It is very exciting to see all the experts here raising
> > very
> > > >> good
> > > >>>>>>>> points.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> As we go further, we see more and more options to improve
> > our
> > > >>>>>>>> solution,
> > > >>>>>>>>>>>>> which makes concluding and updating the KIP impossible.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> The main suggestions so far are:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3. `send` must throw the exception
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> My concern about the 3rd suggestion:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. Does the change cause any issue with backward
> > > compatibility?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. The `send (bad record)` already transits the
> transaction
> > > to
> > > >> the
> > > >>>>>>>> error
> > > >>>>>>>>>>>>> state. No user, including Streams is able to transit the
> > > >>>>>> transaction
> > > >>>>>>>> back
> > > >>>>>>>>>>>>> from the error state. Do you mean we remove the
> > > >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
> > > >>>>>>>>>>>>> <
> > > >>>>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> as well?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>>> Alieh
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> > > >>>>>>>>>>>> andrew_schofi...@live.com>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Artem,
> > > >>>>>>>>>>>>>> I think you make a good point which is worth further
> > > >>>>>> consideration.
> > > >>>>>>>> If
> > > >>>>>>>>>>>>>> any of the existing methods is really ripe for a change
> > > here,
> > > >> it’s
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>> send() that actually caused the problem. If that can be
> > > fixed
> > > >> so
> > > >>>>>>>> there
> > > >>>>>>>>>>>> are
> > > >>>>>>>>>>>>>> no situations in which a lurking error breaks a
> > transaction,
> > > >> that
> > > >>>>>>>> might
> > > >>>>>>>>>>>> be
> > > >>>>>>>>>>>>>> the best.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>> Andrew
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <
> > > >> alivsh...@confluent.io
> > > >>>>>>>>>>>> .INVALID>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I thought we still wait for requests (and their
> errors)
> > to
> > > >> come
> > > >>>>>>>> in and
> > > >>>>>>>>>>>>>>> could handle fatal errors appropriately.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that
> > when
> > > >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to
> ignore
> > > >> errors.
> > > >>>>>>>> So
> > > >>>>>>>>>>>> if
> > > >>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>> do
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> 1. send
> > > >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> the commit will succeed.  You can look at the example
> in
> > > >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and
> > just
> > > >>>>>>>> substitute
> > > >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send
> > > >> errors")
> > > >>>>>> and
> > > >>>>>>>> we
> > > >>>>>>>>>>>> get
> > > >>>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would
> > > >> potentially be
> > > >>>>>>>> even
> > > >>>>>>>>>>>>>>> worse than the original buggy behavior because the bug
> > was
> > > >> that
> > > >>>>>> we
> > > >>>>>>>>>>>>>> ignored
> > > >>>>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> > > >>>>>>>> necessarily the
> > > >>>>>>>>>>>>>>> ones that we got from the broker.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Actually, looking at
> > > >>>>>>>> https://github.com/apache/kafka/pull/11508/files,
> > > >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error
> > from
> > > >> the
> > > >>>>>>>> "send"
> > > >>>>>>>>>>>>>>> method itself, rather than trying to set it to be
> thrown
> > > >> during
> > > >>>>>>>> commit?
> > > >>>>>>>>>>>>>>> This way the example in
> > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> > > >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an
> > > >> opportunity
> > > >>>>>>>> for
> > > >>>>>>>>>>>> KS
> > > >>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if
> we
> > > >> need a
> > > >>>>>>>> KIP for
> > > >>>>>>>>>>>>>>> that, just do a better fix of the old bug.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> -Artem
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> > > >>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> > > >>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here
> > looks
> > > >>>>>>>> reasonable.
> > > >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes sense
> > to
> > > >> me and
> > > >>>>>>>> makes
> > > >>>>>>>>>>>>>> me
> > > >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in the
> > > >> methods
> > > >>>>>>>> relevant
> > > >>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> the issue.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
> > > >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> > without
> > > >>>>>> explicit
> > > >>>>>>>>>>>>>> flush,
> > > >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> > application
> > > >> won't
> > > >>>>>> be
> > > >>>>>>>>>>>> able
> > > >>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Is this with respect to the case a request is still
> > > inflight
> > > >>>>>> when
> > > >>>>>>>> we
> > > >>>>>>>>>>>>>> call
> > > >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for
> requests
> > > (and
> > > >>>>>> their
> > > >>>>>>>>>>>>>> errors)
> > > >>>>>>>>>>>>>>>> to come in and could handle fatal errors
> appropriately.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Justine
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> > > >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar
> > > could
> > > >> be a
> > > >>>>>>>> good
> > > >>>>>>>>>>>>>>>> way
> > > >>>>>>>>>>>>>>>>> forward?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I like this approach.  One minor concern is that if
> we
> > > set
> > > >>>>>>>> "ignore
> > > >>>>>>>>>>>> send
> > > >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option
> > without
> > > >>>>>>>> explicit
> > > >>>>>>>>>>>>>> flush,
> > > >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the
> > application
> > > >> won't
> > > >>>>>>>> be
> > > >>>>>>>>>>>> able
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> > But
> > > I
> > > >>>>>> guess
> > > >>>>>>>>>>>> we'll
> > > >>>>>>>>>>>>>>>> just
> > > >>>>>>>>>>>>>>>>> have to clearly document it.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to
> > optionally
> > > >>>>>> restore
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> bug,
> > > >> which is
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> -Artem
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
> > > >>>>>>>> mj...@apache.org>
> > > >>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot
> of
> > > >>>>>> support.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new
> > > >> method". I
> > > >>>>>>>> think
> > > >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but
> > > rather
> > > >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one;
> for
> > > >> non-tx
> > > >>>>>>>> case,
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> different flush variants would not make sense.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options"
> > > >> object, so
> > > >>>>>>>> maybe
> > > >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could
> > be a
> > > >> good
> > > >>>>>>>> way
> > > >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean`
> parameter,
> > > >>>>>>>> aesthetically,
> > > >>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter,
> we
> > > >> might
> > > >>>>>> not
> > > >>>>>>>> even
> > > >>>>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()`
> > > >> method?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> > > >>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes
> > the
> > > >>>>>>>> behaviour
> > > >>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs.
> > But I
> > > >>>>>> totally
> > > >>>>>>>>>>>>>>>>>> understand
> > > >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of
> > the
> > > >> other
> > > >>>>>>>>>>>>>>>>> suggestions
> > > >>>>>>>>>>>>>>>>>>> in this thread seem neater.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Personally, I would add another method to
> > > KafkaProducer.
> > > >> Not
> > > >>>>>> an
> > > >>>>>>>>>>>>>>>>> overload
> > > >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all.
> Using
> > > >>>>>> Matthias’s
> > > >>>>>>>>>>>>>>>>> options,
> > > >>>>>>>>>>>>>>>>>>> I prefer (3).
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>> Andrew
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <
> > > liane...@gmail.com
> > > >>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the
> > config
> > > >> not
> > > >>>>>>>> being
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> most
> > > >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability.
> > > >> Getting
> > > >>>>>>>> then to
> > > >>>>>>>>>>>>>>>>>> Matthias
> > > >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that
> > it
> > > >> seems
> > > >>>>>>>> they
> > > >>>>>>>>>>>>>>>> might
> > > >>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some
> > > other
> > > >>>>>> twist
> > > >>>>>>>> to
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> flush
> > > >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another
> param
> > > to
> > > >> it,
> > > >>>>>> or
> > > >>>>>>>>>>>>>>>> another
> > > >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context
> to
> > > >> answer
> > > >>>>>>>> that,
> > > >>>>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>>>> if it
> > > >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some
> kind
> > > >>>>>>>> FlushOptions
> > > >>>>>>>>>>>>>>>>>> params to
> > > >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility
> > point
> > > of
> > > >>>>>>>> view. It
> > > >>>>>>>>>>>>>>>>> would
> > > >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for now
> > but
> > > >> could
> > > >>>>>>>>>>>> accept
> > > >>>>>>>>>>>>>>>>> any
> > > >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove
> the
> > > >>>>>>>> "ugliness"
> > > >>>>>>>>>>>>>>>>> Matthias
> > > >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the
> > different
> > > >>>>>>>> semantics
> > > >>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>> flush,
> > > >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and
> > > >>>>>>>>>>>> commitTransaction
> > > >>>>>>>>>>>>>>>>>> java
> > > >>>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears the
> > last
> > > >>>>>>>> exception"
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after
> > > >> flush,
> > > >>>>>> but
> > > >>>>>>>> it
> > > >>>>>>>>>>>>>>>>> really
> > > >>>>>>>>>>>>>>>>>> all
> > > >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an
> > example
> > > to
> > > >>>>>> show
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>> flow
> > > >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great
> gain
> > > >> here):
> > > >>>>>>>> flush
> > > >>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>> clear
> > > >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever
> error
> > > >> handling
> > > >>>>>>>> we
> > > >>>>>>>>>>>> want
> > > >>>>>>>>>>>>>>>>> ->
> > > >>>>>>>>>>>>>>>>>>>> commitTransaction successfully
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Thanks!
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Lianet
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
> > > >>>>>>>>>>>>>>>>> fearthecel...@gmail.com
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more
> > that
> > > >> might
> > > >>>>>>>> help
> > > >>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> if,
> > > >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
> > > >>>>>>>> commitTransaction()
> > > >>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
> > > >>>>>>>> tolerateRecordErrors).
> > > >>>>>>>>>>>>>>>> This
> > > >>>>>>>>>>>>>>>>>> seems
> > > >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral
> > > >> change we
> > > >>>>>>>> want,
> > > >>>>>>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>>>>>> only
> > > >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API
> > method
> > > >> that
> > > >>>>>> is
> > > >>>>>>>> only
> > > >>>>>>>>>>>>>>>>> used
> > > >>>>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the
> > > issue
> > > >> of
> > > >>>>>>>> whether
> > > >>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered
> > > semantics)
> > > >>>>>>>> should
> > > >>>>>>>>>>>>>>>> throw
> > > >>>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>>>>>> not. Thoughts?
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot
> > more
> > > >> than
> > > >>>>>> the
> > > >>>>>>>>>>>>>>>>> pluggable
> > > >>>>>>>>>>>>>>>>>>>>> handler!
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this
> > behavior
> > > >> via
> > > >>>>>>>>>>>>>>>>> configuration
> > > >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that
> > > >> application
> > > >>>>>>>> code
> > > >>>>>>>>>>>>>>>> will
> > > >>>>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one type
> of
> > > >>>>>> behavior
> > > >>>>>>>> from
> > > >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that
> > > application
> > > >> code
> > > >>>>>>>> to
> > > >>>>>>>>>>>>>>>>> declare
> > > >>>>>>>>>>>>>>>>>> its
> > > >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer
> seems
> > > >> more
> > > >>>>>>>>>>>>>>>> appropriate
> > > >>>>>>>>>>>>>>>>>> than,
> > > >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application
> to
> > > >> tweak a
> > > >>>>>>>>>>>>>>>>>> configuration
> > > >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside
> it.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Chris
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax
> <
> > > >>>>>>>>>>>> mj...@apache.org
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the
> KIP
> > > >> as-is,
> > > >>>>>> but
> > > >>>>>>>>>>>> think
> > > >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move
> forward?
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>   1. add config to allow "silent error
> clearance"
> > as
> > > >> the
> > > >>>>>>>> KIP
> > > >>>>>>>>>>>>>>>>> proposes
> > > >>>>>>>>>>>>>>>>>>>>>>   2. change flush() to clear error and let it
> > throw
> > > >>>>>>>>>>>>>>>>>>>>>>   3. add new flushAndThrow()` (or better name)
> > which
> > > >>>>>> clears
> > > >>>>>>>>>>>> error
> > > >>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>>>>> throws
> > > >>>>>>>>>>>>>>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and let
> > user
> > > >> pick
> > > >>>>>>>> (and
> > > >>>>>>>>>>>>>>>>>> deprecate
> > > >>>>>>>>>>>>>>>>>>>>>> existing `flush()`)
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior
> change,
> > > we
> > > >>>>>> might
> > > >>>>>>>> also
> > > >>>>>>>>>>>>>>>>> need
> > > >>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem
> > > >> mentioned.
> > > >>>>>>>> (3)
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> (4)
> > > >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we
> > > >> kinda get
> > > >>>>>>>> an
> > > >>>>>>>>>>>> ugly
> > > >>>>>>>>>>>>>>>>>> API?
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference.
> Seems
> > > we
> > > >> need
> > > >>>>>>>> to
> > > >>>>>>>>>>>> pick
> > > >>>>>>>>>>>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution?
> > Would
> > > >> be
> > > >>>>>>>> good to
> > > >>>>>>>>>>>>>>>> her
> > > >>>>>>>>>>>>>>>>>> from
> > > >>>>>>>>>>>>>>>>>>>>>> others what they think
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of
> > > >> suggestions:
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush after
> > we
> > > >> clear
> > > >>>>>>>> it.
> > > >>>>>>>>>>>>>>>> This
> > > >>>>>>>>>>>>>>>>>>>>> would
> > > >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send
> +
> > > >> flush +
> > > >>>>>>>>>>>> commit"
> > > >>>>>>>>>>>>>>>>> (the
> > > >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to
> > > express
> > > >> the
> > > >>>>>>>>>>>> former,
> > > >>>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same)
> would
> > > >> throw if
> > > >>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> transaction
> > > >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either
> > way
> > > >> it's
> > > >>>>>>>> going
> > > >>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>>>>> correct).
> > > >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended
> by
> > > the
> > > >>>>>>>> caller to
> > > >>>>>>>>>>>>>>>>>>>>> intercept
> > > >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and
> > commit
> > > >> the
> > > >>>>>>>>>>>>>>>>> transaction.
> > > >>>>>>>>>>>>>>>>>>>>>> This
> > > >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if
> > someone
> > > >> has
> > > >>>>>>>> code
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>> doesn't
> > > >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic
> > "send +
> > > >>>>>> flush +
> > > >>>>>>>>>>>>>>>> commit"
> > > >>>>>>>>>>>>>>>>>>>>> would
> > > >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things
> possible,
> > > an
> > > >>>>>>>>>>>> application
> > > >>>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>>>> add
> > > >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some
> errors.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way to
> > > >> express
> > > >>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> modification
> > > >>>>>>>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic
> > that
> > > >> calls
> > > >>>>>>>>>>>> "flush"
> > > >>>>>>>>>>>>>>>>>> needs
> > > >>>>>>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring
> > > >> semantics in
> > > >>>>>> a
> > > >>>>>>>>>>>>>>>> detached
> > > >>>>>>>>>>>>>>>>>>>>> place
> > > >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.
> > This
> > > >> can
> > > >>>>>> be
> > > >>>>>>>>>>>>>>>>> especially
> > > >>>>>>>>>>>>>>>>>>>>> bad
> > > >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file
> > at
> > > >> run
> > > >>>>>>>> time, in
> > > >>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>>> case a
> > > >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the
> > > application
> > > >>>>>>>> because it
> > > >>>>>>>>>>>>>>>> was
> > > >>>>>>>>>>>>>>>>>>>>>> written
> > > >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the
> semantics
> > > is
> > > >>>>>>>> switched.
> > > >>>>>>>>>>>>>>>>> Given
> > > >>>>>>>>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the
> caller's
> > > >>>>>>>> expectation,
> > > >>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>> way
> > > >>>>>>>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's
> > > >> expectation
> > > >>>>>>>> to
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> "flush"
> > > >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different
> > name
> > > or
> > > >>>>>> have
> > > >>>>>>>> an
> > > >>>>>>>>>>>>>>>>>> overload
> > > >>>>>>>>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the
> semantics
> > > (the
> > > >>>>>>>> current
> > > >>>>>>>>>>>>>>>>> method
> > > >>>>>>>>>>>>>>>>>>>>>> could
> > > >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> -Artem
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
> > > >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> > > >>>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059
> > > that
> > > >>>>>>>> suggests
> > > >>>>>>>>>>>>>>>>> adding
> > > >>>>>>>>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>
> > > >>
> > >
> >
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error>

Reply via email to