Hi Artem,

I think it'd make sense to throw directly from send whenever possible,
instead of returning an already-completed future. I didn't do that in my
bug fix to try to be conservative about breaking changes but this seems to
have caused its own set of headaches. It would be a little less flexible
though, since (as you note) it would still be impossible to commit
transactions after errors have been reported from brokers.

I'll leave it up to the Kafka Streams folks to decide if that flexibility
is required. If it is, then users could explicitly call flush() before
committing (and ignoring errors for) or aborting a transaction, if they
want to implement fine-grained error handling logic such as allowing errors
for a subset of topics to be ignored.

Hi Matthias,

Most of the time you're right and we can't throw from send(); however, in
this case (client-side record-too-large exception), the error is actually
noticed by the producer before send() returns, so it should be possible to
throw directly.

Cheers,

Chris

On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> wrote:

> Not sure if we can change send and make it throw, given that send() is
> async? That is why users can register a `Callback` to begin with, right?
>
> And Alieh's point about backward compatibility is also a fair concern.
>
>
> > Actually, this would potentially be even
> > worse than the original buggy behavior because the bug was that we
> ignored
> > errors that happened in the "send()" method itself, not necessarily the
> > ones that we got from the broker.
>
> My understanding was that `commitTx(swallowError)` would only swallow
> `send()` errors, not errors about the actually commit. I agree that it
> would be very bad to swallow errors about the actual tx commit...
>
> It's a fair question if this might be too subtle; to make it explicit,
> we could use `CommitOpions#ignorePendingSendErors()` [working name] to
> make it clear.
>
>
> If we think it's too subtle to change commit to swallow send() errors,
> maybe going with `flush(FlushOptions)` would be clearer (and we can use
> `FlushOption#swallowSendErrorsForTransactions()` [working name] to be
> explicitly that the `FlushOption` for now has only an effect for TX).
>
>
> Thoughts?
>
>
> -Matthias
>
>
>
> On 6/21/24 4:10 AM, Alieh Saeedi wrote:
> > Hi all,
> >
> >
> > It is very exciting to see all the experts here raising very good points.
> >
> > As we go further, we see more and more options to improve our solution,
> > which makes concluding and updating the KIP impossible.
> >
> >
> > The main suggestions so far are:
> >
> > 1. `flush` with `flushOptions` as input parameter
> >
> > 2. `commitTx` with `commitOptions` as input parameter
> >
> > 3. `send` must throw the exception
> >
> >
> > My concern about the 3rd suggestion:
> >
> > 1. Does the change cause any issue with backward compatibility?
> >
> > 2. The `send (bad record)` already transits the transaction to the error
> > state. No user, including Streams is able to transit the transaction back
> > from the error state. Do you mean we remove the
> > `maybeTransitionToErrorState(e)` from here
> > <
> https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112
> >
> > as well?
> >
> > Cheers,
> > Alieh
> >
> >
> > On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Artem,
> >> I think you make a good point which is worth further consideration. If
> >> any of the existing methods is really ripe for a change here, it’s the
> >> send() that actually caused the problem. If that can be fixed so there
> are
> >> no situations in which a lurking error breaks a transaction, that might
> be
> >> the best.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io
> .INVALID>
> >> wrote:
> >>>
> >>>> I thought we still wait for requests (and their errors) to come in and
> >>> could handle fatal errors appropriately.
> >>>
> >>> We do wait for requests, but my understanding is that when
> >>> commitTransaction("ignore send errors") we want to ignore errors.  So
> if
> >> we
> >>> do
> >>>
> >>> 1. send
> >>> 2. commitTransaction("ignore send errors")
> >>>
> >>> the commit will succeed.  You can look at the example in
> >>> https://issues.apache.org/jira/browse/KAFKA-9279 and just substitute
> >>> commitTransaction with commitTransaction("ignore send errors") and we
> get
> >>> the buggy behavior back :-).  Actually, this would potentially be even
> >>> worse than the original buggy behavior because the bug was that we
> >> ignored
> >>> errors that happened in the "send()" method itself, not necessarily the
> >>> ones that we got from the broker.
> >>>
> >>> Actually, looking at https://github.com/apache/kafka/pull/11508/files,
> >>> wouldn't a better solution be to just throw the error from the "send"
> >>> method itself, rather than trying to set it to be thrown during commit?
> >>> This way the example in
> https://issues.apache.org/jira/browse/KAFKA-9279
> >>> would be fixed, and at the same time it would give an opportunity for
> KS
> >> to
> >>> catch the error and ignore it if needed.  Not sure if we need a KIP for
> >>> that, just do a better fix of the old bug.
> >>>
> >>> -Artem
> >>>
> >>> On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
> >> <jols...@confluent.io.invalid>
> >>> wrote:
> >>>
> >>>> I'm a bit late to the party, but the discussion here looks reasonable.
> >>>> Moving the logic to a transactional method makes sense to me and makes
> >> me
> >>>> feel a bit better about keeping the complexity in the methods relevant
> >> to
> >>>> the issue.
> >>>>
> >>>>> One minor concern is that if we set "ignore send
> >>>> errors" (or whatever we decide to name it) option without explicit
> >> flush,
> >>>> it'll actually lead to broken behavior as the application won't be
> able
> >> to
> >>>> stop a commit from proceeding even on fatal errors.
> >>>>
> >>>> Is this with respect to the case a request is still inflight when we
> >> call
> >>>> commitTransaction? I thought we still wait for requests (and their
> >> errors)
> >>>> to come in and could handle fatal errors appropriately.
> >>>>
> >>>> Justine
> >>>>
> >>>> On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
> >>>> <alivsh...@confluent.io.invalid> wrote:
> >>>>
> >>>>> Hi Matthias (and other folks who suggested ideas),
> >>>>>
> >>>>>> maybe `commitTransaction(CommitOptions)` or similar could be a good
> >>>> way
> >>>>> forward?
> >>>>>
> >>>>> I like this approach.  One minor concern is that if we set "ignore
> send
> >>>>> errors" (or whatever we decide to name it) option without explicit
> >> flush,
> >>>>> it'll actually lead to broken behavior as the application won't be
> able
> >>>> to
> >>>>> stop a commit from proceeding even on fatal errors.  But I guess
> we'll
> >>>> just
> >>>>> have to clearly document it.
> >>>>>
> >>>>> In some way we are basically adding a flag to optionally restore the
> >>>>> https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is the
> >>>>> motivation for all these changes, anyway :-).
> >>>>>
> >>>>> -Artem
> >>>>>
> >>>>>
> >>>>> On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <mj...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Seems the option to use a config does not get a lot of support.
> >>>>>>
> >>>>>> So we need to go with some form or "overload / new method". I think
> >>>>>> Chris' point about not coupling it to `flush()` but rather
> >>>>>> `commitTransaction()` is actually a very good one; for non-tx case,
> >> the
> >>>>>> different flush variants would not make sense.
> >>>>>>
> >>>>>> I also like Lianet's idea to pass in some "options" object, so maybe
> >>>>>> `commitTransaction(CommitOptions)` or similar could be a good way
> >>>>>> forward? It's much better than a `boolean` parameter, aesthetically,
> >> as
> >>>>>> we as extendable in the future if necessary.
> >>>>>>
> >>>>>> Given that we would pass in an optional parameter, we might not even
> >>>>>> need to deprecate the existing `commitTransaction()` method?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 6/20/24 9:12 AM, Andrew Schofield wrote:
> >>>>>>> Hi Alieh,
> >>>>>>> Thanks for the KIP.
> >>>>>>>
> >>>>>>> I *really* don’t like adding a config which changes the behaviour
> of
> >>>>> the
> >>>>>>> flush() method. We already have too many configs. But I totally
> >>>>>> understand
> >>>>>>> the problem that you’re trying to solve and some of the other
> >>>>> suggestions
> >>>>>>> in this thread seem neater.
> >>>>>>>
> >>>>>>> Personally, I would add another method to KafkaProducer. Not an
> >>>>> overload
> >>>>>>> on flush() because this is not flush() at all. Using Matthias’s
> >>>>> options,
> >>>>>>> I prefer (3).
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Andrew
> >>>>>>>
> >>>>>>>> On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> Hi all, thanks for the KIP Alieh!
> >>>>>>>>
> >>>>>>>> LM1. Totally agree with Artem's point about the config not being
> the
> >>>>>> most
> >>>>>>>> explicit/flexible way to express this capability. Getting then to
> >>>>>> Matthias
> >>>>>>>> 4 options, what I don't like about 3 and 4 is that it seems they
> >>>> might
> >>>>>> not
> >>>>>>>> age very well? Aren't we going to be wanting some other twist to
> the
> >>>>>> flush
> >>>>>>>> semantics that will have us adding yet another param to it, or
> >>>> another
> >>>>>>>> overloaded method? I truly don't have the context to answer that,
> >>>> but
> >>>>>> if it
> >>>>>>>> feels like a realistic future maybe adding some kind FlushOptions
> >>>>>> params to
> >>>>>>>> the flush would be better from an extensibility point of view. It
> >>>>> would
> >>>>>>>> only have the clearErrors option available for now but could
> accept
> >>>>> any
> >>>>>>>> other we may need. I find that this would remove the "ugliness"
> >>>>> Matthias
> >>>>>>>> pointed out for 3. and 4.
> >>>>>>>>
> >>>>>>>> LM2. No matter how we end up expressing the different semantics
> for
> >>>>>> flush,
> >>>>>>>> let's make sure we update the KIP on the flush and
> commitTransaction
> >>>>>> java
> >>>>>>>> docs. It currently states that  flush "clears the last exception"
> >>>> and
> >>>>>>>> commitTransaction "will NOT throw" if called after flush, but it
> >>>>> really
> >>>>>> all
> >>>>>>>> depends on the config/options/method used.
> >>>>>>>>
> >>>>>>>> LM3. I find it would be helpful to include an example to show the
> >>>> new
> >>>>>> flow
> >>>>>>>> that we're unblocking (I see this as the great gain here): flush
> >>>> with
> >>>>>> clear
> >>>>>>>> error option enabled -> catch and do whatever error handling we
> want
> >>>>> ->
> >>>>>>>> commitTransaction successfully
> >>>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>>
> >>>>>>>> Lianet
> >>>>>>>>
> >>>>>>>> On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
> >>>>> fearthecel...@gmail.com
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> I like the alternatives you've listed. One more that might help
> is
> >>>>> if,
> >>>>>>>>> instead of overloading flush(), we overloaded commitTransaction()
> >>>> to
> >>>>>>>>> something like commitTransaction(boolean tolerateRecordErrors).
> >>>> This
> >>>>>> seems
> >>>>>>>>> slightly cleaner in that it takes the behavioral change we want,
> >>>>> which
> >>>>>> only
> >>>>>>>>> applies to transactional producers, to an API method that is only
> >>>>> used
> >>>>>> for
> >>>>>>>>> transactional producers. It would also avoid the issue of whether
> >>>> or
> >>>>>> not
> >>>>>>>>> flush() (or a new variant of it with altered semantics) should
> >>>> throw
> >>>>> or
> >>>>>>>>> not. Thoughts?
> >>>>>>>>>
> >>>>>>>>> Hi Alieh,
> >>>>>>>>>
> >>>>>>>>> Thanks for the KIP, I like this direction a lot more than the
> >>>>> pluggable
> >>>>>>>>> handler!
> >>>>>>>>>
> >>>>>>>>> I share Artem's concerns that enabling this behavior via
> >>>>> configuration
> >>>>>>>>> doesn't seem like a great fit. It's likely that application code
> >>>> will
> >>>>>> be
> >>>>>>>>> written in a style that only works with one type of behavior from
> >>>>>>>>> transactional producers, so requiring that application code to
> >>>>> declare
> >>>>>> its
> >>>>>>>>> expectations for the behavior of its producer seems more
> >>>> appropriate
> >>>>>> than,
> >>>>>>>>> e.g., allowing users deploying that application to tweak a
> >>>>>> configuration
> >>>>>>>>> file that gets fed to producers spun up inside it.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Chris
> >>>>>>>>>
> >>>>>>>>> On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
> mj...@apache.org
> >>>>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks for the KIP Alieh. I actually like the KIP as-is, but
> think
> >>>>>>>>>> Arthem raises very good points...
> >>>>>>>>>>
> >>>>>>>>>> Seems we have four options on how to move forward?
> >>>>>>>>>>
> >>>>>>>>>>   1. add config to allow "silent error clearance" as the KIP
> >>>>> proposes
> >>>>>>>>>>   2. change flush() to clear error and let it throw
> >>>>>>>>>>   3. add new flushAndThrow()` (or better name) which clears
> error
> >>>>> and
> >>>>>>>>>> throws
> >>>>>>>>>>   4. add `flush(boolean clearAndThrow)` and let user pick (and
> >>>>>> deprecate
> >>>>>>>>>> existing `flush()`)
> >>>>>>>>>>
> >>>>>>>>>> For (2), given that it would be a behavior change, we might also
> >>>>> need
> >>>>>> a
> >>>>>>>>>> public "feature flag" config.
> >>>>>>>>>>
> >>>>>>>>>> It seems, both (1) and (2) have the issue Artem mentioned. (3)
> and
> >>>>> (4)
> >>>>>>>>>> would be safer to this end, however, for both we kinda get an
> ugly
> >>>>>> API?
> >>>>>>>>>>
> >>>>>>>>>> Not sure right now if I have any preference. Seems we need to
> pick
> >>>>>> some
> >>>>>>>>>> evil and that there is no clear best solution? Would be good to
> >>>> her
> >>>>>> from
> >>>>>>>>>> others what they think
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 6/18/24 8:39 PM, Artem Livshits wrote:
> >>>>>>>>>>> Hi Alieh,
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you for the KIP.  I have a couple of suggestions:
> >>>>>>>>>>>
> >>>>>>>>>>> AL1.  We should throw an error from flush after we clear it.
> >>>> This
> >>>>>>>>> would
> >>>>>>>>>>> make it so that both "send + commit" and "send + flush +
> commit"
> >>>>> (the
> >>>>>>>>>>> latter looks like just a more verbose way to express the
> former,
> >>>>> and
> >>>>>> it
> >>>>>>>>>>> would be intuitive if it behaves the same) would throw if the
> >>>>>>>>> transaction
> >>>>>>>>>>> has an error (so if the code is written either way it's going
> be
> >>>>>>>>>> correct).
> >>>>>>>>>>> At the same time, the latter could be extended by the caller to
> >>>>>>>>> intercept
> >>>>>>>>>>> exceptions from flush, ignore as needed, and commit the
> >>>>> transaction.
> >>>>>>>>>> This
> >>>>>>>>>>> solution would keep basic things simple (if someone has code
> that
> >>>>>>>>> doesn't
> >>>>>>>>>>> require advanced error handling, then basic "send + flush +
> >>>> commit"
> >>>>>>>>> would
> >>>>>>>>>>> do the right thing) and advanced things possible, an
> application
> >>>>> can
> >>>>>>>>> add
> >>>>>>>>>>> try + catch around flush and ignore some errors.
> >>>>>>>>>>>
> >>>>>>>>>>> AL2.  I'm not sure if config is the best way to express the
> >>>>>>>>> modification
> >>>>>>>>>> of
> >>>>>>>>>>> the "flush" semantics -- the application logic that calls
> "flush"
> >>>>>> needs
> >>>>>>>>>> to
> >>>>>>>>>>> match the "flush" semantics and configuring semantics in a
> >>>> detached
> >>>>>>>>> place
> >>>>>>>>>>> creates a room for bugs due to discrepancies.  This can be
> >>>>> especially
> >>>>>>>>> bad
> >>>>>>>>>>> if the producer loads configuration from a file at run time, in
> >>>>> that
> >>>>>>>>>> case a
> >>>>>>>>>>> mistake in configuration could break the application because it
> >>>> was
> >>>>>>>>>> written
> >>>>>>>>>>> to expect one "flush" semantics but the semantics is switched.
> >>>>> Given
> >>>>>>>>>> that
> >>>>>>>>>>> the "flush" semantics needs to match the caller's expectation,
> a
> >>>>> way
> >>>>>> to
> >>>>>>>>>>> accomplish that would be to pass the caller's expectation to
> the
> >>>>>>>>> "flush"
> >>>>>>>>>>> call by either have a method with a different name or have an
> >>>>>> overload
> >>>>>>>>>> with
> >>>>>>>>>>> a Boolen flag that would configure the semantics (the current
> >>>>> method
> >>>>>>>>>> could
> >>>>>>>>>>> just redirect to the new one).
> >>>>>>>>>>>
> >>>>>>>>>>> -Artem
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
> >>>>>>>>>> <asae...@confluent.io.invalid>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'd like to kick off a discussion for KIP-1059 that suggests
> >>>>> adding
> >>>>>> a
> >>>>>>>>>> new
> >>>>>>>>>>>> feature to the Producer flush() method.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Alieh
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
> >
>

Reply via email to