One quick thought: if a user invokes Producer::abortTransaction from within
a producer callback today, even in the midst of an ongoing call to
Producer::commitTransaction, what is the behavior? Would it be reasonable
to support this behavior as a way to allow error handling to take place
during implicit flushes, via producer callback?

On Mon, Jun 24, 2024 at 9:21 PM Matthias J. Sax <mj...@apache.org> wrote:

> My point it, that it does not seem to be safe to allow users to ignore
> errors with an implicit flush, and I think it's better to only allow it
> with (ie, after) an explicit flush().
>
> My reasoning is, that users should make a decision to ignore errors or
> not, before calling `commitTx()`, but after inspecting all potential
> send errors. With an implicit flush, users need to "blindly" decide to
> ignore send errors, because there are pending sends and potential errors
> are not known yet, when calling `commitTx()`.
>
>
>
> > In the documentation of commitTransaction, we say if any send throws an
> > error, commitTransaction will too.
>
> Yes. And I think we should keep it this way for an implicit flush. With
> an explicit flush, `commitTransaction()` cannot encounter any send
> errors any longer.
>
>
>
> > It says that all callbacks will be executed, but we ignore the errors of
> > the callbacks.
>
> Ah. Thanks for pointing this out. For this case it's even worse (for
> case (2)), because the user cannot inspect any errors and make any
> decision to ignore or not during an implicit flush...
>
>
>
> > We shouldn't be relying on errors in the callback unless we are
> > calling flush, which we can still do. It seems this has always been the
> > case as well.
>
> Yes, has always been this way, and my point is to keep it this way
> (option (2) would change it), and not start to allow to ignore errors
> with an implicit flush.
>
>
>
> -Matthias
>
>
>
> On 6/24/24 4:57 PM, Justine Olshan wrote:
> > Transaction verification is a concept from KIP-890 referring to the
> > verification that a partition has been added to the transaction. It's
> not a
> > huge deal, but maybe we don't want to overload the terminology.
> >
> > For option 2, I was a little confused by this
> >
> >>    when commitTx is called, there is still pending Futures and not
> > all Callbacks are executed yet -- with the implicit flush, we know that
> > all Callbacks are executed, but even for this case, the user could only
> > throw an exception inside the Callback to stop the TX to eventually
> > commit -- Futures cannot be used to make a decision to ignore error and
> > commit or not.
> >
> > In the documentation of commitTransaction, we say if any send throws an
> > error, commitTransaction will too.
> >
> > *Further, if any of the {@link #send(ProducerRecord)} calls which were
> part
> > of the transaction hit irrecoverable errors, this method will throw the
> > last received exception immediately and the transaction will not be
> > committed.*
> >
> > It says that all callbacks will be executed, but we ignore the errors of
> > the callbacks.
> >
> > *If the transaction is committed successfully and this method returns
> > without throwing an exception, it is guaranteed that all {@link Callback
> > callbacks} for records in the transaction will have been invoked and
> > completed. Note that exceptions thrown by callbacks are ignored; the
> > producer proceeds to commit the transaction in any case.*
> >
> > Is it fair to say though that for the send errors, we can choose to
> ignore
> > them? II wasn't understanding where the callbacks come in with your
> > comment. We shouldn't be relying on errors in the callback unless we are
> > calling flush, which we can still do. It seems this has always been the
> > case as well.
> >
> > Justine
> >
> > On Mon, Jun 24, 2024 at 11:07 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Agreed. Options 1 and 3 are safe. Option 2 is not. I’d be happy with 3a
> as
> >> the way.
> >>
> >> I suggest “TRANSACTION VERIFIED”.
> >>
> >> There isn’t really precedent for options in the producer API. We could
> use
> >> an enum,
> >> which is easy to use and not very future-proof. Or we could use a class
> >> like the
> >> admin API does, which is cumbersome and flexible.
> >>
> >>    CommitTransactionOptions.TRANSACTION_VERIFIED
> >>
> >> or
> >>
> >>    public class CommitTransactionOptions {
> >>      public CommitTransactionOptions();
> >>
> >>      CommitTransactionOptions transactionVerified(boolean
> >> transactionVerified);
> >>
> >>      boolean transactionVerified();
> >>    }
> >>
> >>
> >> Then 3b is:
> >>
> >>     send(…)
> >>     send(…)
> >>     flush()
> >>     commitTransaction(new
> >> CommitTransactionOptions().transactionVerified(true))
> >>
> >>
> >> I’d tend towards the enum here because I doubt we need as much
> flexibility
> >> as the admin API requires.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>
> >>> On 24 Jun 2024, at 18:39, Matthias J. Sax <mj...@apache.org> wrote:
> >>>
> >>> I am ok either way (ie, flush or commit), but I think we need to define
> >> exact semantics, and I think there is some subtle thing to consider:
> >>>
> >>>
> >>>
> >>> 1) flush(Options)
> >>>
> >>> Example:
> >>>
> >>>   send(...)
> >>>   send(...)
> >>>
> >>>   flush(ignoreErrors)
> >>>
> >>>   // at this point, we know that all Futures are completed and all
> >> Callbacks are executed, and we can assume that all user code checking
> for
> >> errors did execute, before `commitTx` is called
> >>>
> >>>   // I consider this option as safe
> >>>
> >>>   commitTx()
> >>>
> >>>
> >>> 2) commitTx(Option)
> >>>
> >>> Example:
> >>>
> >>>   send(...)
> >>>   send(...)
> >>>
> >>>   // when commitTx is called, there is still pending Futures and not
> all
> >> Callbacks are executed yet -- with the implicit flush, we know that all
> >> Callbacks are executed, but even for this case, the user could only
> throw
> >> an exception inside the Callback to stop the TX to eventually commit --
> >> Futures cannot be used to make a decision to ignore error and commit or
> not.
> >>>
> >>>   // I consider this option not as safe
> >>>
> >>>   commitTx(igrnoreErrors)
> >>>
> >>>
> >>>
> >>> 3a) required flush + commitTx(Option)
> >>>
> >>> Example:
> >>>
> >>>   send(...)
> >>>   send(...)
> >>>
> >>>   flush()
> >>>
> >>>   // at this point, we know that all Future are completed and all
> >> Callbacks are executed, and we can assume that all user code checking
> for
> >> error did execute, before `commitTx` is called
> >>>
> >>>   // I consider this option as safe
> >>>
> >>>   commitTx(ignoreErrors)
> >>>
> >>>
> >>> 3b) missing flush + commitTx(Option)
> >>>
> >>> Example:
> >>>
> >>>   send(...)
> >>>   send(...)
> >>>
> >>>   // as flush() was not called explicitly, we should ignore
> >> `ignoreErrors` flag and always throw an exception if the producer is in
> >> error state, because we cannot be sure that the user did all required
> check
> >> for error handling
> >>>
> >>>   commitTx(ignoreErrors)
> >>>
> >>>
> >>>
> >>> The only issue with option (3) is, that it's more complex and semantics
> >> are more subtle. But it might be the a good (necessary?) bridge between
> (1)
> >> and (2): (3) is semantically sound (we ignore errors via passing a flag
> >> into commitTx() instead of flush()), and at the same time safe (we force
> >> users to explicitly flush() and [hopefully] do proper error handling,
> and
> >> don't rely in am implicit flush() during commitTx() which might be error
> >> prone).
> >>>
> >>> (Also need to find a good and descriptive name for the flag we pass
> into
> >> `commitTx()` for this case.)
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/24/24 8:51 AM, Andrew Schofield wrote:
> >>>> Hi Chris,
> >>>> That works for me too. I slightly prefer an option on flush(), but
> what
> >> you suggested
> >>>> works too.
> >>>> Thanks,
> >>>> Andrew
> >>>>> On 24 Jun 2024, at 15:14, Chris Egerton <chr...@aiven.io.INVALID>
> >> wrote:
> >>>>>
> >>>>> Hi Andrew,
> >>>>>
> >>>>> I like a lot of what you said, but I still believe it's better to
> >> override
> >>>>> commitTransaction than flush. Users will already have to manually opt
> >> in to
> >>>>> ignoring errors encountered during transactions, and we can document
> >>>>> recommended usage (i.e., explicitly invoking flush() before invoking
> >>>>> commitTransaction(ignoreRecordErrors)) in the newly-introduced
> method.
> >> I
> >>>>> don't believe it's worth the increased cognitive load on users with
> >>>>> non-transactional producers to introduce an overloaded flush()
> variant.
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Chris
> >>>>>
> >>>>> On Mon, Jun 24, 2024 at 9:39 AM Andrew Schofield <
> >> andrew_schofi...@live.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Alieh,
> >>>>>> Thanks for driving this. Unfortunately, there are many parts of the
> >> API
> >>>>>> which
> >>>>>> are a bit unfortunate and it’s tricky to make small improvements
> that
> >>>>>> don’t have
> >>>>>> downsides.
> >>>>>>
> >>>>>> I don’t like the idea of using a configuration because configuration
> >> is
> >>>>>> often
> >>>>>> outside the application and changing the behaviour of someone else’s
> >>>>>> application
> >>>>>> without understanding it is risky. Anything which embeds a
> >> transactional
> >>>>>> producer
> >>>>>> could have its behaviour changed unexpectedly.
> >>>>>>
> >>>>>> It would be been much nicer if send() didn’t fail silently and
> change
> >> the
> >>>>>> transaction
> >>>>>> state. But, because it’s an asynchronous operation, I don’t really
> >> think
> >>>>>> we can
> >>>>>> just make it throw all exceptions, even though I really think that
> >>>>>> `send()` is the
> >>>>>> method with the problem here.
> >>>>>>
> >>>>>> The contract of `flush()` is that it makes sure that all preceding
> >> sends
> >>>>>> will have
> >>>>>> completed, so it should be true that a well written application
> would
> >> be
> >>>>>> able to
> >>>>>> know which records were OK because of the Future<RecordMetadata>
> >> returned
> >>>>>> by the `send()` method. It should be able to determine whether it
> >> wants to
> >>>>>> commit
> >>>>>> the transaction even if some of the intended operations didn’t
> >> succeed.
> >>>>>>
> >>>>>> What we don’t currently have is a way for the application to say to
> >> the
> >>>>>> KafkaProducer
> >>>>>> that it knows the outcome of sending the records and to confirm that
> >> it
> >>>>>> wants to proceed.
> >>>>>> Then it would not be necessary for `commitTransaction()` to throw an
> >>>>>> exception to
> >>>>>> report a historical error which the application might choose to
> >> ignore.
> >>>>>>
> >>>>>> Having read the comments, I think the KIP is on the right lines
> >> focusing
> >>>>>> on the `flush()`
> >>>>>> method. My suggestion is that we introduce an option on `flush()` to
> >> be
> >>>>>> used before
> >>>>>> `commitTransaction()` for applications that want to be able to
> commit
> >>>>>> transactions which
> >>>>>> had known failed operations.
> >>>>>>
> >>>>>> The code would be:
> >>>>>>
> >>>>>>    producer.beginTransaction();
> >>>>>>
> >>>>>>    future1 = producer.send(goodRecord1);
> >>>>>>    future2 = producer.send(badRecord); // The future from this call
> >> will
> >>>>>> complete exceptionally
> >>>>>>    future3 = producer.send(goodRecord2);
> >>>>>>
> >>>>>>    producer.flush(FlushOption.TRANSACTION_READY);
> >>>>>>
> >>>>>>    // At this point, we know that all 3 futures are complete and the
> >>>>>> transaction contains 2 records
> >>>>>>    producer.commitTransaction();
> >>>>>>
> >>>>>> I wouldn’t deprecate `flush()` with no option. It just uses the
> >> default
> >>>>>> option which behaves
> >>>>>> like today.
> >>>>>>
> >>>>>> Why did I suggest an option on `flush()` rather than
> >>>>>> `commitTransaction()`? Because with
> >>>>>> `flush()`, it’s clear when the application is stating that it’s seen
> >> all
> >>>>>> of the results from its
> >>>>>> `send()` calls and it’s ready to proceed. If it has to rely on
> >> flushing
> >>>>>> that occurs inside
> >>>>>> `commitTransaction()`, I don’t see it’s as clear-cut.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Andrew
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> On 24 Jun 2024, at 13:44, Alieh Saeedi
> <asae...@confluent.io.INVALID
> >>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>> Thanks for the interesting discussion.
> >>>>>>>
> >>>>>>> I assume that now the main questions are as follows:
> >>>>>>>
> >>>>>>> 1. Do we need to transit the transcation to the error state for API
> >>>>>>> exceptions?
> >>>>>>> 2. Should we throw the API exception in `send()` instead of
> >> returning a
> >>>>>>> future error?
> >>>>>>> 3. If the answer to question (1) is NO and to question (2) is YES,
> >> do we
> >>>>>>> need to change the current `flush` or `commitTnx` at all?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Alieh
> >>>>>>>
> >>>>>>> On Sat, Jun 22, 2024 at 3:21 AM Matthias J. Sax <mj...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Kirk,
> >>>>>>>>
> >>>>>>>> can you elaborate on a few points?
> >>>>>>>>
> >>>>>>>>> Otherwise users would have to know to explicitly change their
> code
> >> to
> >>>>>>>> invoke flush().
> >>>>>>>>
> >>>>>>>> Why? If we would add an option to `flush(FlushOption)`, the
> existing
> >>>>>>>> `flush()` w/o any option will still be there, right? If we would
> >> really
> >>>>>>>> deprecate existing `flush()`, it would just mean that we would
> pass
> >>>>>>>> "default FlushOption" into an implicit flush (and yes, we would
> >> need to
> >>>>>>>> define what this would be).
> >>>>>>>>
> >>>>>>>> I think there is no clear winner (as pointed out in my last
> reply),
> >> and
> >>>>>>>> both `flush(FlushOption)` and `commitTx(CommitOption)` has
> >> advantages
> >>>>>>>> and drawbacks. Guess we need to just agree on which tradeoff we
> >> want to
> >>>>>>>> move forward with?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Not sure if your database example is a 1:1 fit? I think, the
> better
> >>>>>>>> comparison would be:
> >>>>>>>>
> >>>>>>>> BEGIN TX;
> >>>>>>>> INSERT INTO foo VALUES (’a’);
> >>>>>>>> INSERT INTO foo VALUES (’b’);
> >>>>>>>> INSERT INTO foo VALUES (’c’);
> >>>>>>>> INSERT INTO foo VALUES (’not sure’);
> >>>>>>>>
> >>>>>>>> For this case, the full TX would roll back, right? I still think
> >> that
> >>>>>>>> allowing users to just skip over the last error, and continue the
> TX
> >>>>>>>> would be ok. In the end, we provide a programmatic API, and not a
> >>>>>>>> declarative one as SQL. Of course, default behavior would still be
> >> to
> >>>>>>>> put the producer into error state, and the user would need to call
> >>>>>>>> `abortTransaction()` to move forward.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 6/21/24 5:26 PM, Kirk True wrote:
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>>> On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org
> >
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> If we want to limit it to `RecordTooLargeException` throwing
> from
> >>>>>>>> `send()` directly make sense. Thanks for calling it out.
> >>>>>>>>>>
> >>>>>>>>>> It's still a question of backward compatibility? `send()` does
> >> throw
> >>>>>>>> exceptions already, including generic `KafkaException`. Not sure
> if
> >> this
> >>>>>>>> helps with backward compatibility? Could we just add a new
> exception
> >>>>>> type
> >>>>>>>> (which is a child of `KafkaException`)?
> >>>>>>>>>>
> >>>>>>>>>> The Producer JavaDocs are not totally explicit about it IMHO.
> >>>>>>>>>>
> >>>>>>>>>> I think we could expect that some generic error handling path
> gets
> >>>>>>>> executed. For the TX-case, I would assume that a TX would be
> >> aborted if
> >>>>>>>> `send()` throws or that the producer would be `closed()`. Overall
> >> this
> >>>>>>>> might be safe?
> >>>>>>>>>>
> >>>>>>>>>>> It would be a little less flexible
> >>>>>>>>>>>> though, since (as you note) it would still be impossible to
> >> commit
> >>>>>>>>>>>> transactions after errors have been reported from brokers.
> >>>>>>>>>>
> >>>>>>>>>> KS would still need a way to clear the error state of the
> >> producer. We
> >>>>>>>> could catch a `RecordTooLargeException` from `send()`, call the
> >> handler
> >>>>>> and
> >>>>>>>> let it decide what to do next. But if it does return `CONTINUE` to
> >>>>>> swallow
> >>>>>>>> the error and drop the poison pill record on the floor, we would
> >> want to
> >>>>>>>> move forward and commit the transaction.
> >>>>>>>>>>
> >>>>>>>>>> But the question is: if we cannot add a record to the tx, does
> the
> >>>>>>>> producer need to go into error state? In the end, we did throw and
> >>>>>> inform
> >>>>>>>> the app that the record was _not_ added, and it's up to the app to
> >>>>>> decide
> >>>>>>>> what to do next?
> >>>>>>>>>
> >>>>>>>>> That’s an excellent question…
> >>>>>>>>>
> >>>>>>>>> Imagine the user’s application is writing information to a
> database
> >>>>>>>> instead of Kafka. If there’s a table with a CHAR(1) column and
> this
> >> SQL
> >>>>>>>> statement was attempted, what should happen?
> >>>>>>>>>
> >>>>>>>>>     INSERT INTO foo VALUES (’not sure’);
> >>>>>>>>>
> >>>>>>>>> Yes, that DML would fail, sure, but would the user expect that
> the
> >>>>>>>> connection used by database library would get stuck in some kind
> of
> >>>>>> error
> >>>>>>>> state? A user would be able catch the error and either continue or
> >>>>>> abort,
> >>>>>>>> based on their business rules.
> >>>>>>>>>
> >>>>>>>>> So I agree with what I believe you’re implying: we shouldn’t
> >> poison the
> >>>>>>>> Producer/TransactionManager on certain types of application-level
> >>>>>> errors in
> >>>>>>>> send().
> >>>>>>>>>
> >>>>>>>>> Kirk
> >>>>>>>>>
> >>>>>>>>>> If we report the error only via the `Callback` it's a different
> >> story,
> >>>>>>>> because the contract for this case is clearly specified on the
> >> JavaDocs:
> >>>>>>>>>>
> >>>>>>>>>>> When used as part of a transaction, it is not necessary to
> >> define a
> >>>>>>>> callback or check the result of the future
> >>>>>>>>>>> in order to detect errors from <code>send</code>. If any of the
> >> send
> >>>>>>>> calls failed with an irrecoverable error,
> >>>>>>>>>>> the final {@link #commitTransaction()} call will fail and throw
> >> the
> >>>>>>>> exception from the last failed send. When
> >>>>>>>>>>> this happens, your application should call {@link
> >>>>>> #abortTransaction()}
> >>>>>>>> to reset the state and continue to send
> >>>>>>>>>>> data.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 6/21/24 11:42 AM, Chris Egerton wrote:
> >>>>>>>>>>> Hi Artem,
> >>>>>>>>>>> I think it'd make sense to throw directly from send whenever
> >>>>>> possible,
> >>>>>>>>>>> instead of returning an already-completed future. I didn't do
> >> that in
> >>>>>>>> my
> >>>>>>>>>>> bug fix to try to be conservative about breaking changes but
> this
> >>>>>>>> seems to
> >>>>>>>>>>> have caused its own set of headaches. It would be a little less
> >>>>>>>> flexible
> >>>>>>>>>>> though, since (as you note) it would still be impossible to
> >> commit
> >>>>>>>>>>> transactions after errors have been reported from brokers.
> >>>>>>>>>>> I'll leave it up to the Kafka Streams folks to decide if that
> >>>>>>>> flexibility
> >>>>>>>>>>> is required. If it is, then users could explicitly call flush()
> >>>>>> before
> >>>>>>>>>>> committing (and ignoring errors for) or aborting a transaction,
> >> if
> >>>>>> they
> >>>>>>>>>>> want to implement fine-grained error handling logic such as
> >> allowing
> >>>>>>>> errors
> >>>>>>>>>>> for a subset of topics to be ignored.
> >>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>> Most of the time you're right and we can't throw from send();
> >>>>>> however,
> >>>>>>>> in
> >>>>>>>>>>> this case (client-side record-too-large exception), the error
> is
> >>>>>>>> actually
> >>>>>>>>>>> noticed by the producer before send() returns, so it should be
> >>>>>>>> possible to
> >>>>>>>>>>> throw directly.
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Chris
> >>>>>>>>>>> On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>>>> Not sure if we can change send and make it throw, given that
> >> send()
> >>>>>> is
> >>>>>>>>>>>> async? That is why users can register a `Callback` to begin
> >> with,
> >>>>>>>> right?
> >>>>>>>>>>>>
> >>>>>>>>>>>> And Alieh's point about backward compatibility is also a fair
> >>>>>> concern.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Actually, this would potentially be even
> >>>>>>>>>>>>> worse than the original buggy behavior because the bug was
> >> that we
> >>>>>>>>>>>> ignored
> >>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> >> necessarily
> >>>>>>>> the
> >>>>>>>>>>>>> ones that we got from the broker.
> >>>>>>>>>>>>
> >>>>>>>>>>>> My understanding was that `commitTx(swallowError)` would only
> >>>>>> swallow
> >>>>>>>>>>>> `send()` errors, not errors about the actually commit. I agree
> >> that
> >>>>>> it
> >>>>>>>>>>>> would be very bad to swallow errors about the actual tx
> >> commit...
> >>>>>>>>>>>>
> >>>>>>>>>>>> It's a fair question if this might be too subtle; to make it
> >>>>>> explicit,
> >>>>>>>>>>>> we could use `CommitOpions#ignorePendingSendErors()` [working
> >> name]
> >>>>>> to
> >>>>>>>>>>>> make it clear.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> If we think it's too subtle to change commit to swallow send()
> >>>>>> errors,
> >>>>>>>>>>>> maybe going with `flush(FlushOptions)` would be clearer (and
> we
> >> can
> >>>>>>>> use
> >>>>>>>>>>>> `FlushOption#swallowSendErrorsForTransactions()` [working
> name]
> >> to
> >>>>>> be
> >>>>>>>>>>>> explicitly that the `FlushOption` for now has only an effect
> for
> >>>>>> TX).
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thoughts?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It is very exciting to see all the experts here raising very
> >> good
> >>>>>>>> points.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> As we go further, we see more and more options to improve our
> >>>>>>>> solution,
> >>>>>>>>>>>>> which makes concluding and updating the KIP impossible.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The main suggestions so far are:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. `flush` with `flushOptions` as input parameter
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. `commitTx` with `commitOptions` as input parameter
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. `send` must throw the exception
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> My concern about the 3rd suggestion:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. Does the change cause any issue with backward
> compatibility?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. The `send (bad record)` already transits the transaction
> to
> >> the
> >>>>>>>> error
> >>>>>>>>>>>>> state. No user, including Streams is able to transit the
> >>>>>> transaction
> >>>>>>>> back
> >>>>>>>>>>>>> from the error state. Do you mean we remove the
> >>>>>>>>>>>>> `maybeTransitionToErrorState(e)` from here
> >>>>>>>>>>>>> <
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> as well?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Alieh
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> >>>>>>>>>>>> andrew_schofi...@live.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Artem,
> >>>>>>>>>>>>>> I think you make a good point which is worth further
> >>>>>> consideration.
> >>>>>>>> If
> >>>>>>>>>>>>>> any of the existing methods is really ripe for a change
> here,
> >> it’s
> >>>>>>>> the
> >>>>>>>>>>>>>> send() that actually caused the problem. If that can be
> fixed
> >> so
> >>>>>>>> there
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> no situations in which a lurking error breaks a transaction,
> >> that
> >>>>>>>> might
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 21 Jun 2024, at 01:51, Artem Livshits <
> >> alivsh...@confluent.io
> >>>>>>>>>>>> .INVALID>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I thought we still wait for requests (and their errors) to
> >> come
> >>>>>>>> in and
> >>>>>>>>>>>>>>> could handle fatal errors appropriately.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> We do wait for requests, but my understanding is that when
> >>>>>>>>>>>>>>> commitTransaction("ignore send errors") we want to ignore
> >> errors.
> >>>>>>>> So
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1. send
> >>>>>>>>>>>>>>> 2. commitTransaction("ignore send errors")
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the commit will succeed.  You can look at the example in
> >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 and just
> >>>>>>>> substitute
> >>>>>>>>>>>>>>> commitTransaction with commitTransaction("ignore send
> >> errors")
> >>>>>> and
> >>>>>>>> we
> >>>>>>>>>>>> get
> >>>>>>>>>>>>>>> the buggy behavior back :-).  Actually, this would
> >> potentially be
> >>>>>>>> even
> >>>>>>>>>>>>>>> worse than the original buggy behavior because the bug was
> >> that
> >>>>>> we
> >>>>>>>>>>>>>> ignored
> >>>>>>>>>>>>>>> errors that happened in the "send()" method itself, not
> >>>>>>>> necessarily the
> >>>>>>>>>>>>>>> ones that we got from the broker.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Actually, looking at
> >>>>>>>> https://github.com/apache/kafka/pull/11508/files,
> >>>>>>>>>>>>>>> wouldn't a better solution be to just throw the error from
> >> the
> >>>>>>>> "send"
> >>>>>>>>>>>>>>> method itself, rather than trying to set it to be thrown
> >> during
> >>>>>>>> commit?
> >>>>>>>>>>>>>>> This way the example in
> >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279
> >>>>>>>>>>>>>>> would be fixed, and at the same time it would give an
> >> opportunity
> >>>>>>>> for
> >>>>>>>>>>>> KS
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> catch the error and ignore it if needed.  Not sure if we
> >> need a
> >>>>>>>> KIP for
> >>>>>>>>>>>>>>> that, just do a better fix of the old bug.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> >>>>>>>>>>>>>> <jols...@confluent.io.invalid>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm a bit late to the party, but the discussion here looks
> >>>>>>>> reasonable.
> >>>>>>>>>>>>>>>> Moving the logic to a transactional method makes sense to
> >> me and
> >>>>>>>> makes
> >>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>>> feel a bit better about keeping the complexity in the
> >> methods
> >>>>>>>> relevant
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> the issue.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> One minor concern is that if we set "ignore send
> >>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
> >>>>>> explicit
> >>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the application
> >> won't
> >>>>>> be
> >>>>>>>>>>>> able
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Is this with respect to the case a request is still
> inflight
> >>>>>> when
> >>>>>>>> we
> >>>>>>>>>>>>>> call
> >>>>>>>>>>>>>>>> commitTransaction? I thought we still wait for requests
> (and
> >>>>>> their
> >>>>>>>>>>>>>> errors)
> >>>>>>>>>>>>>>>> to come in and could handle fatal errors appropriately.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Justine
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> >>>>>>>>>>>>>>>> <alivsh...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Matthias (and other folks who suggested ideas),
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> maybe `commitTransaction(CommitOptions)` or similar
> could
> >> be a
> >>>>>>>> good
> >>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>> forward?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I like this approach.  One minor concern is that if we
> set
> >>>>>>>> "ignore
> >>>>>>>>>>>> send
> >>>>>>>>>>>>>>>>> errors" (or whatever we decide to name it) option without
> >>>>>>>> explicit
> >>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>> it'll actually lead to broken behavior as the application
> >> won't
> >>>>>>>> be
> >>>>>>>>>>>> able
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> stop a commit from proceeding even on fatal errors.  But
> I
> >>>>>> guess
> >>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>> have to clearly document it.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> In some way we are basically adding a flag to optionally
> >>>>>> restore
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug,
> >> which is
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> motivation for all these changes, anyway :-).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <
> >>>>>>>> mj...@apache.org>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Seems the option to use a config does not get a lot of
> >>>>>> support.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> So we need to go with some form or "overload / new
> >> method". I
> >>>>>>>> think
> >>>>>>>>>>>>>>>>>> Chris' point about not coupling it to `flush()` but
> rather
> >>>>>>>>>>>>>>>>>> `commitTransaction()` is actually a very good one; for
> >> non-tx
> >>>>>>>> case,
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> different flush variants would not make sense.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also like Lianet's idea to pass in some "options"
> >> object, so
> >>>>>>>> maybe
> >>>>>>>>>>>>>>>>>> `commitTransaction(CommitOptions)` or similar could be a
> >> good
> >>>>>>>> way
> >>>>>>>>>>>>>>>>>> forward? It's much better than a `boolean` parameter,
> >>>>>>>> aesthetically,
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> we as extendable in the future if necessary.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Given that we would pass in an optional parameter, we
> >> might
> >>>>>> not
> >>>>>>>> even
> >>>>>>>>>>>>>>>>>> need to deprecate the existing `commitTransaction()`
> >> method?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> >>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I *really* don’t like adding a config which changes the
> >>>>>>>> behaviour
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> flush() method. We already have too many configs. But I
> >>>>>> totally
> >>>>>>>>>>>>>>>>>> understand
> >>>>>>>>>>>>>>>>>>> the problem that you’re trying to solve and some of the
> >> other
> >>>>>>>>>>>>>>>>> suggestions
> >>>>>>>>>>>>>>>>>>> in this thread seem neater.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Personally, I would add another method to
> KafkaProducer.
> >> Not
> >>>>>> an
> >>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>> on flush() because this is not flush() at all. Using
> >>>>>> Matthias’s
> >>>>>>>>>>>>>>>>> options,
> >>>>>>>>>>>>>>>>>>> I prefer (3).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Andrew
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <
> liane...@gmail.com
> >>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all, thanks for the KIP Alieh!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> LM1. Totally agree with Artem's point about the config
> >> not
> >>>>>>>> being
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> most
> >>>>>>>>>>>>>>>>>>>> explicit/flexible way to express this capability.
> >> Getting
> >>>>>>>> then to
> >>>>>>>>>>>>>>>>>> Matthias
> >>>>>>>>>>>>>>>>>>>> 4 options, what I don't like about 3 and 4 is that it
> >> seems
> >>>>>>>> they
> >>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> age very well? Aren't we going to be wanting some
> other
> >>>>>> twist
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> flush
> >>>>>>>>>>>>>>>>>>>> semantics that will have us adding yet another param
> to
> >> it,
> >>>>>> or
> >>>>>>>>>>>>>>>> another
> >>>>>>>>>>>>>>>>>>>> overloaded method? I truly don't have the context to
> >> answer
> >>>>>>>> that,
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>> if it
> >>>>>>>>>>>>>>>>>>>> feels like a realistic future maybe adding some kind
> >>>>>>>> FlushOptions
> >>>>>>>>>>>>>>>>>> params to
> >>>>>>>>>>>>>>>>>>>> the flush would be better from an extensibility point
> of
> >>>>>>>> view. It
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> only have the clearErrors option available for now but
> >> could
> >>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>> any
> >>>>>>>>>>>>>>>>>>>> other we may need. I find that this would remove the
> >>>>>>>> "ugliness"
> >>>>>>>>>>>>>>>>> Matthias
> >>>>>>>>>>>>>>>>>>>> pointed out for 3. and 4.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> LM2. No matter how we end up expressing the different
> >>>>>>>> semantics
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>>>>> let's make sure we update the KIP on the flush and
> >>>>>>>>>>>> commitTransaction
> >>>>>>>>>>>>>>>>>> java
> >>>>>>>>>>>>>>>>>>>> docs. It currently states that  flush "clears the last
> >>>>>>>> exception"
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> commitTransaction "will NOT throw" if called after
> >> flush,
> >>>>>> but
> >>>>>>>> it
> >>>>>>>>>>>>>>>>> really
> >>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>> depends on the config/options/method used.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> LM3. I find it would be helpful to include an example
> to
> >>>>>> show
> >>>>>>>> the
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>> flow
> >>>>>>>>>>>>>>>>>>>> that we're unblocking (I see this as the great gain
> >> here):
> >>>>>>>> flush
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> clear
> >>>>>>>>>>>>>>>>>>>> error option enabled -> catch and do whatever error
> >> handling
> >>>>>>>> we
> >>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>> ->
> >>>>>>>>>>>>>>>>>>>> commitTransaction successfully
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Lianet
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
> >>>>>>>>>>>>>>>>> fearthecel...@gmail.com
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I like the alternatives you've listed. One more that
> >> might
> >>>>>>>> help
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> if,
> >>>>>>>>>>>>>>>>>>>>> instead of overloading flush(), we overloaded
> >>>>>>>> commitTransaction()
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> something like commitTransaction(boolean
> >>>>>>>> tolerateRecordErrors).
> >>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>> slightly cleaner in that it takes the behavioral
> >> change we
> >>>>>>>> want,
> >>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>> applies to transactional producers, to an API method
> >> that
> >>>>>> is
> >>>>>>>> only
> >>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>> transactional producers. It would also avoid the
> issue
> >> of
> >>>>>>>> whether
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>> flush() (or a new variant of it with altered
> semantics)
> >>>>>>>> should
> >>>>>>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>> not. Thoughts?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I like this direction a lot more
> >> than
> >>>>>> the
> >>>>>>>>>>>>>>>>> pluggable
> >>>>>>>>>>>>>>>>>>>>> handler!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I share Artem's concerns that enabling this behavior
> >> via
> >>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>> doesn't seem like a great fit. It's likely that
> >> application
> >>>>>>>> code
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>> written in a style that only works with one type of
> >>>>>> behavior
> >>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> transactional producers, so requiring that
> application
> >> code
> >>>>>>>> to
> >>>>>>>>>>>>>>>>> declare
> >>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>> expectations for the behavior of its producer seems
> >> more
> >>>>>>>>>>>>>>>> appropriate
> >>>>>>>>>>>>>>>>>> than,
> >>>>>>>>>>>>>>>>>>>>> e.g., allowing users deploying that application to
> >> tweak a
> >>>>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>>>>>> file that gets fed to producers spun up inside it.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
> >>>>>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP
> >> as-is,
> >>>>>> but
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>> Arthem raises very good points...
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Seems we have four options on how to move forward?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>   1. add config to allow "silent error clearance" as
> >> the
> >>>>>>>> KIP
> >>>>>>>>>>>>>>>>> proposes
> >>>>>>>>>>>>>>>>>>>>>>   2. change flush() to clear error and let it throw
> >>>>>>>>>>>>>>>>>>>>>>   3. add new flushAndThrow()` (or better name) which
> >>>>>> clears
> >>>>>>>>>>>> error
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> throws
> >>>>>>>>>>>>>>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and let user
> >> pick
> >>>>>>>> (and
> >>>>>>>>>>>>>>>>>> deprecate
> >>>>>>>>>>>>>>>>>>>>>> existing `flush()`)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For (2), given that it would be a behavior change,
> we
> >>>>>> might
> >>>>>>>> also
> >>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> public "feature flag" config.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It seems, both (1) and (2) have the issue Artem
> >> mentioned.
> >>>>>>>> (3)
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> (4)
> >>>>>>>>>>>>>>>>>>>>>> would be safer to this end, however, for both we
> >> kinda get
> >>>>>>>> an
> >>>>>>>>>>>> ugly
> >>>>>>>>>>>>>>>>>> API?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Not sure right now if I have any preference. Seems
> we
> >> need
> >>>>>>>> to
> >>>>>>>>>>>> pick
> >>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>> evil and that there is no clear best solution? Would
> >> be
> >>>>>>>> good to
> >>>>>>>>>>>>>>>> her
> >>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>>> others what they think
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP.  I have a couple of
> >> suggestions:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> AL1.  We should throw an error from flush after we
> >> clear
> >>>>>>>> it.
> >>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>> make it so that both "send + commit" and "send +
> >> flush +
> >>>>>>>>>>>> commit"
> >>>>>>>>>>>>>>>>> (the
> >>>>>>>>>>>>>>>>>>>>>>> latter looks like just a more verbose way to
> express
> >> the
> >>>>>>>>>>>> former,
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> would be intuitive if it behaves the same) would
> >> throw if
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>>>> has an error (so if the code is written either way
> >> it's
> >>>>>>>> going
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> correct).
> >>>>>>>>>>>>>>>>>>>>>>> At the same time, the latter could be extended by
> the
> >>>>>>>> caller to
> >>>>>>>>>>>>>>>>>>>>> intercept
> >>>>>>>>>>>>>>>>>>>>>>> exceptions from flush, ignore as needed, and commit
> >> the
> >>>>>>>>>>>>>>>>> transaction.
> >>>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>>> solution would keep basic things simple (if someone
> >> has
> >>>>>>>> code
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>>>>>> require advanced error handling, then basic "send +
> >>>>>> flush +
> >>>>>>>>>>>>>>>> commit"
> >>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>> do the right thing) and advanced things possible,
> an
> >>>>>>>>>>>> application
> >>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>>>>> try + catch around flush and ignore some errors.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> AL2.  I'm not sure if config is the best way to
> >> express
> >>>>>> the
> >>>>>>>>>>>>>>>>>>>>> modification
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics -- the application logic that
> >> calls
> >>>>>>>>>>>> "flush"
> >>>>>>>>>>>>>>>>>> needs
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> match the "flush" semantics and configuring
> >> semantics in
> >>>>>> a
> >>>>>>>>>>>>>>>> detached
> >>>>>>>>>>>>>>>>>>>>> place
> >>>>>>>>>>>>>>>>>>>>>>> creates a room for bugs due to discrepancies.  This
> >> can
> >>>>>> be
> >>>>>>>>>>>>>>>>> especially
> >>>>>>>>>>>>>>>>>>>>> bad
> >>>>>>>>>>>>>>>>>>>>>>> if the producer loads configuration from a file at
> >> run
> >>>>>>>> time, in
> >>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>> case a
> >>>>>>>>>>>>>>>>>>>>>>> mistake in configuration could break the
> application
> >>>>>>>> because it
> >>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>>>>>>>> to expect one "flush" semantics but the semantics
> is
> >>>>>>>> switched.
> >>>>>>>>>>>>>>>>> Given
> >>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> the "flush" semantics needs to match the caller's
> >>>>>>>> expectation,
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> accomplish that would be to pass the caller's
> >> expectation
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> "flush"
> >>>>>>>>>>>>>>>>>>>>>>> call by either have a method with a different name
> or
> >>>>>> have
> >>>>>>>> an
> >>>>>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>> a Boolen flag that would configure the semantics
> (the
> >>>>>>>> current
> >>>>>>>>>>>>>>>>> method
> >>>>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>> just redirect to the new one).
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -Artem
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
> >>>>>>>>>>>>>>>>>>>>>> <asae...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059
> that
> >>>>>>>> suggests
> >>>>>>>>>>>>>>>>> adding
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>>>>>> feature to the Producer flush() method.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>>>>> Alieh
> >>
> >>
> >>
> >
>

Reply via email to