Hey Kirk,

can you elaborate on a few points?

Otherwise users would have to know to explicitly change their code to invoke 
flush().

Why? If we would add an option to `flush(FlushOption)`, the existing `flush()` w/o any option will still be there, right? If we would really deprecate existing `flush()`, it would just mean that we would pass "default FlushOption" into an implicit flush (and yes, we would need to define what this would be).

I think there is no clear winner (as pointed out in my last reply), and both `flush(FlushOption)` and `commitTx(CommitOption)` has advantages and drawbacks. Guess we need to just agree on which tradeoff we want to move forward with?


Not sure if your database example is a 1:1 fit? I think, the better comparison would be:

BEGIN TX;
INSERT INTO foo VALUES (’a’);
INSERT INTO foo VALUES (’b’);
INSERT INTO foo VALUES (’c’);
INSERT INTO foo VALUES (’not sure’);

For this case, the full TX would roll back, right? I still think that allowing users to just skip over the last error, and continue the TX would be ok. In the end, we provide a programmatic API, and not a declarative one as SQL. Of course, default behavior would still be to put the producer into error state, and the user would need to call `abortTransaction()` to move forward.


-Matthias

On 6/21/24 5:26 PM, Kirk True wrote:
Hi Matthias,

On Jun 21, 2024, at 12:28 PM, Matthias J. Sax <mj...@apache.org> wrote:

If we want to limit it to `RecordTooLargeException` throwing from `send()` 
directly make sense. Thanks for calling it out.

It's still a question of backward compatibility? `send()` does throw exceptions 
already, including generic `KafkaException`. Not sure if this helps with 
backward compatibility? Could we just add a new exception type (which is a 
child of `KafkaException`)?

The Producer JavaDocs are not totally explicit about it IMHO.

I think we could expect that some generic error handling path gets executed. 
For the TX-case, I would assume that a TX would be aborted if `send()` throws 
or that the producer would be `closed()`. Overall this might be safe?

It would be a little less flexible
though, since (as you note) it would still be impossible to commit
transactions after errors have been reported from brokers.

KS would still need a way to clear the error state of the producer. We could 
catch a `RecordTooLargeException` from `send()`, call the handler and let it 
decide what to do next. But if it does return `CONTINUE` to swallow the error 
and drop the poison pill record on the floor, we would want to move forward and 
commit the transaction.

But the question is: if we cannot add a record to the tx, does the producer 
need to go into error state? In the end, we did throw and inform the app that 
the record was _not_ added, and it's up to the app to decide what to do next?

That’s an excellent question…

Imagine the user’s application is writing information to a database instead of 
Kafka. If there’s a table with a CHAR(1) column and this SQL statement was 
attempted, what should happen?

     INSERT INTO foo VALUES (’not sure’);

Yes, that DML would fail, sure, but would the user expect that the connection 
used by database library would get stuck in some kind of error state? A user 
would be able catch the error and either continue or abort, based on their 
business rules.

So I agree with what I believe you’re implying: we shouldn’t poison the 
Producer/TransactionManager on certain types of application-level errors in 
send().

Kirk

If we report the error only via the `Callback` it's a different story, because 
the contract for this case is clearly specified on the JavaDocs:

When used as part of a transaction, it is not necessary to define a callback or 
check the result of the future
in order to detect errors from <code>send</code>. If any of the send calls 
failed with an irrecoverable error,
the final {@link #commitTransaction()} call will fail and throw the exception 
from the last failed send. When
this happens, your application should call {@link #abortTransaction()} to reset 
the state and continue to send
data.



-Matthias


On 6/21/24 11:42 AM, Chris Egerton wrote:
Hi Artem,
I think it'd make sense to throw directly from send whenever possible,
instead of returning an already-completed future. I didn't do that in my
bug fix to try to be conservative about breaking changes but this seems to
have caused its own set of headaches. It would be a little less flexible
though, since (as you note) it would still be impossible to commit
transactions after errors have been reported from brokers.
I'll leave it up to the Kafka Streams folks to decide if that flexibility
is required. If it is, then users could explicitly call flush() before
committing (and ignoring errors for) or aborting a transaction, if they
want to implement fine-grained error handling logic such as allowing errors
for a subset of topics to be ignored.
Hi Matthias,
Most of the time you're right and we can't throw from send(); however, in
this case (client-side record-too-large exception), the error is actually
noticed by the producer before send() returns, so it should be possible to
throw directly.
Cheers,
Chris
On Fri, Jun 21, 2024, 14:25 Matthias J. Sax <mj...@apache.org> wrote:
Not sure if we can change send and make it throw, given that send() is
async? That is why users can register a `Callback` to begin with, right?

And Alieh's point about backward compatibility is also a fair concern.


Actually, this would potentially be even
worse than the original buggy behavior because the bug was that we
ignored
errors that happened in the "send()" method itself, not necessarily the
ones that we got from the broker.

My understanding was that `commitTx(swallowError)` would only swallow
`send()` errors, not errors about the actually commit. I agree that it
would be very bad to swallow errors about the actual tx commit...

It's a fair question if this might be too subtle; to make it explicit,
we could use `CommitOpions#ignorePendingSendErors()` [working name] to
make it clear.


If we think it's too subtle to change commit to swallow send() errors,
maybe going with `flush(FlushOptions)` would be clearer (and we can use
`FlushOption#swallowSendErrorsForTransactions()` [working name] to be
explicitly that the `FlushOption` for now has only an effect for TX).


Thoughts?


-Matthias



On 6/21/24 4:10 AM, Alieh Saeedi wrote:
Hi all,


It is very exciting to see all the experts here raising very good points.

As we go further, we see more and more options to improve our solution,
which makes concluding and updating the KIP impossible.


The main suggestions so far are:

1. `flush` with `flushOptions` as input parameter

2. `commitTx` with `commitOptions` as input parameter

3. `send` must throw the exception


My concern about the 3rd suggestion:

1. Does the change cause any issue with backward compatibility?

2. The `send (bad record)` already transits the transaction to the error
state. No user, including Streams is able to transit the transaction back
from the error state. Do you mean we remove the
`maybeTransitionToErrorState(e)` from here
<
https://github.com/apache/kafka/blob/9b5b434e2a6b2d5290ea403fc02859b1c523d8aa/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1112

as well?

Cheers,
Alieh


On Fri, Jun 21, 2024 at 8:45 AM Andrew Schofield <
andrew_schofi...@live.com>
wrote:

Hi Artem,
I think you make a good point which is worth further consideration. If
any of the existing methods is really ripe for a change here, it’s the
send() that actually caused the problem. If that can be fixed so there
are
no situations in which a lurking error breaks a transaction, that might
be
the best.

Thanks,
Andrew

On 21 Jun 2024, at 01:51, Artem Livshits <alivsh...@confluent.io
.INVALID>
wrote:

I thought we still wait for requests (and their errors) to come in and
could handle fatal errors appropriately.

We do wait for requests, but my understanding is that when
commitTransaction("ignore send errors") we want to ignore errors.  So
if
we
do

1. send
2. commitTransaction("ignore send errors")

the commit will succeed.  You can look at the example in
https://issues.apache.org/jira/browse/KAFKA-9279 and just substitute
commitTransaction with commitTransaction("ignore send errors") and we
get
the buggy behavior back :-).  Actually, this would potentially be even
worse than the original buggy behavior because the bug was that we
ignored
errors that happened in the "send()" method itself, not necessarily the
ones that we got from the broker.

Actually, looking at https://github.com/apache/kafka/pull/11508/files,
wouldn't a better solution be to just throw the error from the "send"
method itself, rather than trying to set it to be thrown during commit?
This way the example in
https://issues.apache.org/jira/browse/KAFKA-9279
would be fixed, and at the same time it would give an opportunity for
KS
to
catch the error and ignore it if needed.  Not sure if we need a KIP for
that, just do a better fix of the old bug.

-Artem

On Thu, Jun 20, 2024 at 4:58 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

I'm a bit late to the party, but the discussion here looks reasonable.
Moving the logic to a transactional method makes sense to me and makes
me
feel a bit better about keeping the complexity in the methods relevant
to
the issue.

One minor concern is that if we set "ignore send
errors" (or whatever we decide to name it) option without explicit
flush,
it'll actually lead to broken behavior as the application won't be
able
to
stop a commit from proceeding even on fatal errors.

Is this with respect to the case a request is still inflight when we
call
commitTransaction? I thought we still wait for requests (and their
errors)
to come in and could handle fatal errors appropriately.

Justine

On Thu, Jun 20, 2024 at 4:32 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Matthias (and other folks who suggested ideas),

maybe `commitTransaction(CommitOptions)` or similar could be a good
way
forward?

I like this approach.  One minor concern is that if we set "ignore
send
errors" (or whatever we decide to name it) option without explicit
flush,
it'll actually lead to broken behavior as the application won't be
able
to
stop a commit from proceeding even on fatal errors.  But I guess
we'll
just
have to clearly document it.

In some way we are basically adding a flag to optionally restore the
https://issues.apache.org/jira/browse/KAFKA-9279 bug, which is the
motivation for all these changes, anyway :-).

-Artem


On Thu, Jun 20, 2024 at 2:18 PM Matthias J. Sax <mj...@apache.org>
wrote:

Seems the option to use a config does not get a lot of support.

So we need to go with some form or "overload / new method". I think
Chris' point about not coupling it to `flush()` but rather
`commitTransaction()` is actually a very good one; for non-tx case,
the
different flush variants would not make sense.

I also like Lianet's idea to pass in some "options" object, so maybe
`commitTransaction(CommitOptions)` or similar could be a good way
forward? It's much better than a `boolean` parameter, aesthetically,
as
we as extendable in the future if necessary.

Given that we would pass in an optional parameter, we might not even
need to deprecate the existing `commitTransaction()` method?



-Matthias

On 6/20/24 9:12 AM, Andrew Schofield wrote:
Hi Alieh,
Thanks for the KIP.

I *really* don’t like adding a config which changes the behaviour
of
the
flush() method. We already have too many configs. But I totally
understand
the problem that you’re trying to solve and some of the other
suggestions
in this thread seem neater.

Personally, I would add another method to KafkaProducer. Not an
overload
on flush() because this is not flush() at all. Using Matthias’s
options,
I prefer (3).

Thanks,
Andrew

On 20 Jun 2024, at 15:08, Lianet M. <liane...@gmail.com> wrote:

Hi all, thanks for the KIP Alieh!

LM1. Totally agree with Artem's point about the config not being
the
most
explicit/flexible way to express this capability. Getting then to
Matthias
4 options, what I don't like about 3 and 4 is that it seems they
might
not
age very well? Aren't we going to be wanting some other twist to
the
flush
semantics that will have us adding yet another param to it, or
another
overloaded method? I truly don't have the context to answer that,
but
if it
feels like a realistic future maybe adding some kind FlushOptions
params to
the flush would be better from an extensibility point of view. It
would
only have the clearErrors option available for now but could
accept
any
other we may need. I find that this would remove the "ugliness"
Matthias
pointed out for 3. and 4.

LM2. No matter how we end up expressing the different semantics
for
flush,
let's make sure we update the KIP on the flush and
commitTransaction
java
docs. It currently states that  flush "clears the last exception"
and
commitTransaction "will NOT throw" if called after flush, but it
really
all
depends on the config/options/method used.

LM3. I find it would be helpful to include an example to show the
new
flow
that we're unblocking (I see this as the great gain here): flush
with
clear
error option enabled -> catch and do whatever error handling we
want
->
commitTransaction successfully

Thanks!

Lianet

On Wed, Jun 19, 2024 at 11:26 PM Chris Egerton <
fearthecel...@gmail.com

wrote:

Hi Matthias,

I like the alternatives you've listed. One more that might help
is
if,
instead of overloading flush(), we overloaded commitTransaction()
to
something like commitTransaction(boolean tolerateRecordErrors).
This
seems
slightly cleaner in that it takes the behavioral change we want,
which
only
applies to transactional producers, to an API method that is only
used
for
transactional producers. It would also avoid the issue of whether
or
not
flush() (or a new variant of it with altered semantics) should
throw
or
not. Thoughts?

Hi Alieh,

Thanks for the KIP, I like this direction a lot more than the
pluggable
handler!

I share Artem's concerns that enabling this behavior via
configuration
doesn't seem like a great fit. It's likely that application code
will
be
written in a style that only works with one type of behavior from
transactional producers, so requiring that application code to
declare
its
expectations for the behavior of its producer seems more
appropriate
than,
e.g., allowing users deploying that application to tweak a
configuration
file that gets fed to producers spun up inside it.

Cheers,

Chris

On Wed, Jun 19, 2024 at 10:32 PM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP Alieh. I actually like the KIP as-is, but
think
Arthem raises very good points...

Seems we have four options on how to move forward?

   1. add config to allow "silent error clearance" as the KIP
proposes
   2. change flush() to clear error and let it throw
   3. add new flushAndThrow()` (or better name) which clears
error
and
throws
   4. add `flush(boolean clearAndThrow)` and let user pick (and
deprecate
existing `flush()`)

For (2), given that it would be a behavior change, we might also
need
a
public "feature flag" config.

It seems, both (1) and (2) have the issue Artem mentioned. (3)
and
(4)
would be safer to this end, however, for both we kinda get an
ugly
API?

Not sure right now if I have any preference. Seems we need to
pick
some
evil and that there is no clear best solution? Would be good to
her
from
others what they think


-Matthias


On 6/18/24 8:39 PM, Artem Livshits wrote:
Hi Alieh,

Thank you for the KIP.  I have a couple of suggestions:

AL1.  We should throw an error from flush after we clear it.
This
would
make it so that both "send + commit" and "send + flush +
commit"
(the
latter looks like just a more verbose way to express the
former,
and
it
would be intuitive if it behaves the same) would throw if the
transaction
has an error (so if the code is written either way it's going
be
correct).
At the same time, the latter could be extended by the caller to
intercept
exceptions from flush, ignore as needed, and commit the
transaction.
This
solution would keep basic things simple (if someone has code
that
doesn't
require advanced error handling, then basic "send + flush +
commit"
would
do the right thing) and advanced things possible, an
application
can
add
try + catch around flush and ignore some errors.

AL2.  I'm not sure if config is the best way to express the
modification
of
the "flush" semantics -- the application logic that calls
"flush"
needs
to
match the "flush" semantics and configuring semantics in a
detached
place
creates a room for bugs due to discrepancies.  This can be
especially
bad
if the producer loads configuration from a file at run time, in
that
case a
mistake in configuration could break the application because it
was
written
to expect one "flush" semantics but the semantics is switched.
Given
that
the "flush" semantics needs to match the caller's expectation,
a
way
to
accomplish that would be to pass the caller's expectation to
the
"flush"
call by either have a method with a different name or have an
overload
with
a Boolen flag that would configure the semantics (the current
method
could
just redirect to the new one).

-Artem

On Mon, Jun 17, 2024 at 9:09 AM Alieh Saeedi
<asae...@confluent.io.invalid>
wrote:

Hi all,

I'd like to kick off a discussion for KIP-1059 that suggests
adding
a
new
feature to the Producer flush() method.









https://cwiki.apache.org/confluence/display/KAFKA/KIP-1059%3A+Enable+the+Producer+flush%28%29+method+to+clear+the+latest+send%28%29+error

Cheers,
Alieh













Reply via email to