Thanks Jason for the thoughts.

On Wed, Jan 27, 2021 at 11:52 AM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Boyang,
>
> Thanks for the iterations here. I think this is something we should have
> done a long time ago. It sounds like there are two API changes here:
>
> 1. We are introducing the `CommitFailedException` to wrap abortable
> errors that are raised from `commitTransaction`. This sounds fine to me. As
> far as I know, the only case we might need this is when we add support to
> let producers recover from coordinator timeouts. Are there any others?
>
> I think the purpose here is to ensure non-fatal exceptions are unified
under the same
exception umbrella, to make the proceeding to abort any ongoing transaction
much easier.
I don't think `coordinator timeouts` is the only case to recover here,
since we have other
non-fatal exceptions such as UnknownPid.

2. We are wrapping non-fatal errors raised from `send` as `KafkaException`.
> The motivation for this is less clear to me and it doesn't look like the
> example from the KIP depends on it. My concern here is compatibility.
> Currently we have the following documentation for the `Callback` API:
>
> ```
>      *                  Non-Retriable exceptions (fatal, the message will
> never be sent):
>      *
>      *                  InvalidTopicException
>      *                  OffsetMetadataTooLargeException
>      *                  RecordBatchTooLargeException
>      *                  RecordTooLargeException
>      *                  UnknownServerException
>      *                  UnknownProducerIdException
>      *                  InvalidProducerEpochException
>      *
>      *                  Retriable exceptions (transient, may be covered by
> increasing #.retries):
>      *
>      *                  CorruptRecordException
>      *                  InvalidMetadataException
>      *                  NotEnoughReplicasAfterAppendException
>      *                  NotEnoughReplicasException
>      *                  OffsetOutOfRangeException
>      *                  TimeoutException
>      *                  UnknownTopicOrPartitionException
> ```
>
> If we wrap all the retriable exceptions documented here as
> `KafkaException`, wouldn't that break any error handling that users might
> already have? it's gonna introduce a compatibility issue.
>
> The original intention was to simplify `send` callback error handling by
doing exception wrapping, as on Streams level
we have to prepare an exhausting list of exceptions to catch as fatal, and
the same lengthy list to catch as
non-fatal. It would be much easier if we got `hints` from the callback.
However,
I agree there is a compatibility concern, what about deprecating the
existing:

void onCompletion(RecordMetadata metadata, Exception exception)

and replace it with:

default void onCompletion(RecordMetadata metadata, Exception exception,
boolean isFatal) {
  this.onCompletion(metadata, exception);
}

to make sure new users get the benefit of understanding the fatality based
on the info presented by the producer?

Thanks,
> Jason
>
>
> On Sat, Jan 23, 2021 at 3:31 AM Hiringuru <i...@hiringuru.com> wrote:
>
> > Why  we are receiving all emails kindly remove us from
> > dev@kafka.apache.org we don't want to receive emails anymore.
> >
> > Thanks
> > > On 01/23/2021 4:14 AM Guozhang Wang <wangg...@gmail.com> wrote:
> > >
> > >
> > > Thanks Boyang, yes I think I was confused about the different handling
> of
> > > two abortTxn calls, and now I get it was not intentional. I think I do
> > not
> > > have more concerns.
> > >
> > > On Fri, Jan 22, 2021 at 1:12 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the clarification Guozhang, I got your point that we want
> to
> > > > have a consistent handling of fatal exceptions being thrown from the
> > > > abortTxn. I modified the current template to move the fatal exception
> > > > try-catch outside of the processing loop to make sure we could get a
> > chance
> > > > to close consumer/producer modules. Let me know what you think.
> > > >
> > > > Best,
> > > > Boyang
> > > >
> > > > On Fri, Jan 22, 2021 at 11:05 AM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > My understanding is that abortTransaction would only throw when the
> > > > > producer is in fatal state. For CommitFailed, the producer should
> > still
> > > > be
> > > > > in the abortable error state, so that abortTransaction call would
> not
> > > > throw.
> > > > >
> > > > > On Fri, Jan 22, 2021 at 11:02 AM Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Or are you going to maintain some internal state such that, the
> > > > >> `abortTransaction` in the catch block would never throw again?
> > > > >>
> > > > >> On Fri, Jan 22, 2021 at 11:01 AM Guozhang Wang <
> wangg...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Boyang/Jason,
> > > > >> >
> > > > >> > I've also thought about this (i.e. using CommitFailed for all
> > > > >> non-fatal),
> > > > >> > but what I'm pondering is that, in the catch (CommitFailed)
> block,
> > > > what
> > > > >> > would happen if the `producer.abortTransaction();` throws again?
> > > > should
> > > > >> > that be captured as a fatal and cause the client to close again.
> > > > >> >
> > > > >> > If yes, then naively the pattern would be:
> > > > >> >
> > > > >> > ...
> > > > >> > catch (CommitFailedException e) {
> > > > >> >         // Transaction commit failed with abortable error, user
> > could
> > > > >> reset
> > > > >> >         // the application state and resume with a new
> > transaction.
> > > > The
> > > > >> > root
> > > > >> >         // cause was wrapped in the thrown exception.
> > > > >> >         resetToLastCommittedPositions(consumer);
> > > > >> >         try {
> > > > >> >             producer.abortTransaction();
> > > > >> >         } catch (KafkaException e) {
> > > > >> >             producer.close();
> > > > >> >             consumer.close();
> > > > >> >             throw e;
> > > > >> >         }
> > > > >> >     } catch (KafkaException e) {
> > > > >> >         producer.close();
> > > > >> >         consumer.close();
> > > > >> >         throw e;
> > > > >> >     }
> > > > >> > ...
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> > On Fri, Jan 22, 2021 at 10:47 AM Boyang Chen <
> > > > >> reluctanthero...@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> Hey Guozhang,
> > > > >> >>
> > > > >> >> Jason and I were discussing the new API offline and decided to
> > take
> > > > >> >> another
> > > > >> >> approach. Firstly, the reason not to invent a new API with
> > returned
> > > > >> >> boolean
> > > > >> >> flag is for compatibility consideration, since old EOS code
> > would not
> > > > >> know
> > > > >> >> that a given transaction commit was failed internally as they
> > don't
> > > > >> listen
> > > > >> >> to the output flag. Our proposed alternative solution is to let
> > > > >> >> *commitTransaction
> > > > >> >> throw CommitFailedException whenever the commit failed with
> > non-fatal
> > > > >> >> exception*, so that on the caller side the handling logic
> > becomes:
> > > > >> >>
> > > > >> >> try {
> > > > >> >>         if (shouldCommit) {
> > > > >> >>             producer.commitTransaction();
> > > > >> >>         } else {
> > > > >> >>             resetToLastCommittedPositions(consumer);
> > > > >> >>             producer.abortTransaction();
> > > > >> >>         }
> > > > >> >>     } catch (CommitFailedException e) {
> > > > >> >>         // Transaction commit failed with abortable error, user
> > could
> > > > >> >> reset
> > > > >> >>         // the application state and resume with a new
> > transaction.
> > > > The
> > > > >> >> root
> > > > >> >>         // cause was wrapped in the thrown exception.
> > > > >> >>         resetToLastCommittedPositions(consumer);
> > > > >> >>         producer.abortTransaction();
> > > > >> >>     } catch (KafkaException e) {
> > > > >> >>         producer.close();
> > > > >> >>         consumer.close();
> > > > >> >>         throw e;
> > > > >> >>     }
> > > > >> >>
> > > > >> >> This approach looks cleaner as all exception types other than
> > > > >> CommitFailed
> > > > >> >> will doom to be fatal, which is very easy to adopt for users.
> In
> > the
> > > > >> >> meantime, we still maintain the commitTxn behavior to throw
> > instead
> > > > of
> > > > >> >> silently failing.
> > > > >> >>
> > > > >> >> In addition, we decided to drop the recommendation to handle
> > > > >> >> TimeoutException and leave it to the users to make the call.
> The
> > > > >> downside
> > > > >> >> for blindly calling abortTxn upon timeout is that we could
> > result in
> > > > an
> > > > >> >> illegal state when the commit was already successful on the
> > broker
> > > > >> >> side. Without a good guarantee on the outcome, overcomplicating
> > the
> > > > >> >> template should not be encouraged IMHO.
> > > > >> >>
> > > > >> >> Let me know your thoughts on the new approach here, thank you!
> > > > >> >>
> > > > >> >> Best,
> > > > >> >> Boyang
> > > > >> >>
> > > > >> >> On Tue, Jan 19, 2021 at 11:11 AM Guozhang Wang <
> > wangg...@gmail.com>
> > > > >> >> wrote:
> > > > >> >>
> > > > >> >> > Thanks for your clarification on 2)/3), that makes sense.
> > > > >> >> >
> > > > >> >> > On Tue, Jan 19, 2021 at 10:16 AM Boyang Chen <
> > > > >> >> reluctanthero...@gmail.com>
> > > > >> >> > wrote:
> > > > >> >> >
> > > > >> >> > > Thanks for the input Guozhang, replied inline.
> > > > >> >> > >
> > > > >> >> > > On Mon, Jan 18, 2021 at 8:57 PM Guozhang Wang <
> > > > wangg...@gmail.com>
> > > > >> >> > wrote:
> > > > >> >> > >
> > > > >> >> > > > Hello Boyang,
> > > > >> >> > > >
> > > > >> >> > > > Thanks for the updated KIP. I read it again and have the
> > > > >> following
> > > > >> >> > > > thoughts:
> > > > >> >> > > >
> > > > >> >> > > > 0. I'm a bit concerned that if commitTxn does not throw
> any
> > > > >> >> non-fatal
> > > > >> >> > > > exception, and instead we rely on the subsequent beginTxn
> > call
> > > > to
> > > > >> >> > throw,
> > > > >> >> > > it
> > > > >> >> > > > may violate some callers with a pattern that relying on
> > > > >> commitTxn to
> > > > >> >> > > > succeed to make some non-rollback operations. For
> example:
> > > > >> >> > > >
> > > > >> >> > > > beginTxn()
> > > > >> >> > > > // do some read-write on my local DB
> > > > >> >> > > > commitTxn()
> > > > >> >> > > > // if commitTxn succeeds, then commit the DB
> > > > >> >> > > >
> > > > >> >> > > > -------------
> > > > >> >> > > >
> > > > >> >> > > > The issue is that, committing DB is a non-rollback
> > operation,
> > > > and
> > > > >> >> users
> > > > >> >> > > may
> > > > >> >> > > > just rely on commitTxn to return without error to make
> this
> > > > >> >> > non-rollback
> > > > >> >> > > > call. Of course we can just claim this pattern is not
> > > > legitimate
> > > > >> >> and is
> > > > >> >> > > not
> > > > >> >> > > > the right way of doing things, but many users may
> naturally
> > > > adopt
> > > > >> >> this
> > > > >> >> > > > pattern.
> > > > >> >> > > >
> > > > >> >> > > > So maybe we should still let commitTxn also throw
> non-fatal
> > > > >> >> exceptions,
> > > > >> >> > > in
> > > > >> >> > > > which case we would then call abortTxn again.
> > > > >> >> > > >
> > > > >> >> > > > But if we do this, the pattern becomes:
> > > > >> >> > > >
> > > > >> >> > > > try {
> > > > >> >> > > >    beginTxn()
> > > > >> >> > > >    // do something
> > > > >> >> > > > } catch (Exception) {
> > > > >> >> > > >    shouldCommit = false;
> > > > >> >> > > > }
> > > > >> >> > > >
> > > > >> >> > > > if (shouldCommit) {
> > > > >> >> > > >     try {
> > > > >> >> > > >         commitTxn()
> > > > >> >> > > >     } catch (...) {        // enumerate all fatal
> > exceptions
> > > > >> >> > > >         shutdown()
> > > > >> >> > > >     } catch (KafkaException) {
> > > > >> >> > > >         // non-fatal
> > > > >> >> > > >         shouldCommit = false;
> > > > >> >> > > >     }
> > > > >> >> > > > }
> > > > >> >> > > >
> > > > >> >> > > > if (!shouldCommit && running()) {
> > > > >> >> > > > try {
> > > > >> >> > > >         abortTxn()
> > > > >> >> > > >     } catch (KafkaException) {
> > > > >> >> > > >         // only throw fatal
> > > > >> >> > > >         shutdown()
> > > > >> >> > > >     }
> > > > >> >> > > > }
> > > > >> >> > > >
> > > > >> >> > > > ---------------------
> > > > >> >> > > >
> > > > >> >> > > > Which is much more complicated.
> > > > >> >> > > >
> > > > >> >> > > > Thank makes me think, the alternative we have discussed
> > offline
> > > > >> may
> > > > >> >> be
> > > > >> >> > > > better: let commitTxn() to return a boolean flag.
> > > > >> >> > > >
> > > > >> >> > > > * If it returns true, it means the commit succeeded.
> Users
> > can
> > > > >> >> > > comfortably
> > > > >> >> > > > continue and do any external non-rollback operations if
> > they
> > > > >> like.
> > > > >> >> > > > * If it returns false, it means the commit failed with
> > > > non-fatal
> > > > >> >> error
> > > > >> >> > > *AND
> > > > >> >> > > > the txn has been aborted*. Users do not need to call
> > abortTxn
> > > > >> again.
> > > > >> >> > > > * If it throws, then it means fatal errors. Users should
> > shut
> > > > >> down
> > > > >> >> the
> > > > >> >> > > > client.
> > > > >> >> > > >
> > > > >> >> > > > In this case, the pattern becomes:
> > > > >> >> > > >
> > > > >> >> > > > try {
> > > > >> >> > > >    beginTxn()
> > > > >> >> > > >    // do something
> > > > >> >> > > > } catch (Exception) {
> > > > >> >> > > >    shouldCommit = false;
> > > > >> >> > > > }
> > > > >> >> > > >
> > > > >> >> > > > try {
> > > > >> >> > > >     if (shouldCommit) {
> > > > >> >> > > >         commitSucceeded = commitTxn()
> > > > >> >> > > >     } else {
> > > > >> >> > > >         // reset offsets, rollback operations, etc
> > > > >> >> > > >         abortTxn()
> > > > >> >> > > >     }
> > > > >> >> > > > } catch (KafkaException) {
> > > > >> >> > > >     // only throw fatal
> > > > >> >> > > >     shutdown()
> > > > >> >> > > > }
> > > > >> >> > > >
> > > > >> >> > > > if (commitSucceeded)
> > > > >> >> > > >    // do other non-rollbackable things
> > > > >> >> > > > else
> > > > >> >> > > >    // reset offsets, rollback operations, etc
> > > > >> >> > > >
> > > > >> >> > > > -------------------------
> > > > >> >> > > >
> > > > >> >> > > > Of course, if we want to have better visibility into what
> > > > caused
> > > > >> the
> > > > >> >> > > commit
> > > > >> >> > > > to fail and txn to abort. We can let the return type be
> an
> > enum
> > > > >> >> instead
> > > > >> >> > > of
> > > > >> >> > > > a flag. But my main idea is to still let the commitTxn be
> > the
> > > > >> final
> > > > >> >> > point
> > > > >> >> > > > users can tell whether this txn succeeded or not, instead
> > of
> > > > >> >> relying on
> > > > >> >> > > the
> > > > >> >> > > > next beginTxn() call.
> > > > >> >> > > >
> > > > >> >> > > > I agree that adding a boolean flag is indeed useful in
> this
> > > > case.
> > > > >> >> Will
> > > > >> >> > > update the KIP.
> > > > >> >> > >
> > > > >> >> > > 1. Re: "while maintaining the behavior to throw fatal
> > exception
> > > > in
> > > > >> >> raw"
> > > > >> >> > not
> > > > >> >> > > > sure what do you mean by "throw" here. Are you proposing
> > the
> > > > >> >> callback
> > > > >> >> > > would
> > > > >> >> > > > *pass* (not throw) in any fatal exceptions when triggered
> > > > without
> > > > >> >> > > wrapping?
> > > > >> >> > > >
> > > > >> >> > > > Yes, I want to say *pass*, the benefit is to make the end
> > > > user's
> > > > >> >> > > expectation consistent
> > > > >> >> > > regarding exception handling.
> > > > >> >> > >
> > > > >> >> > >
> > > > >> >> > > > 2. I'm not sure I understand the claim regarding the
> > callback
> > > > >> that
> > > > >> >> "In
> > > > >> >> > > EOS
> > > > >> >> > > > setup, it is not required to handle the exception". Are
> you
> > > > >> >> proposing
> > > > >> >> > > that,
> > > > >> >> > > > e.g. in Streams, we do not try to handle any exceptions
> if
> > EOS
> > > > is
> > > > >> >> > enabled
> > > > >> >> > > > in the callback anymore, but just let commitTxn() itself
> to
> > > > fail
> > > > >> to
> > > > >> >> be
> > > > >> >> > > > notified about the problem? Personally I think in Streams
> > we
> > > > >> should
> > > > >> >> > just
> > > > >> >> > > > make the handling logic of the callback to be consistent
> > > > >> regardless
> > > > >> >> of
> > > > >> >> > > the
> > > > >> >> > > > EOS settings (today we have different logic depending on
> > this
> > > > >> logic,
> > > > >> >> > > which
> > > > >> >> > > > I think could be unified as well).
> > > > >> >> > > >
> > > > >> >> > > > My idea originates from the claim on send API:
> > > > >> >> > > "When used as part of a transaction, it is not necessary to
> > > > define
> > > > >> a
> > > > >> >> > > callback or check the result of the future  in order to
> > detect
> > > > >> errors
> > > > >> >> > from
> > > > >> >> > > <code>send</code>. "
> > > > >> >> > > My understanding is that for EOS, the exception will be
> > detected
> > > > by
> > > > >> >> > calling
> > > > >> >> > > transactional APIs either way, so it is a duplicate
> handling
> > to
> > > > >> track
> > > > >> >> > > all the sendExceptions in RecordCollector. However, I
> looked
> > up
> > > > >> >> > > sendException is being used today as follow:
> > > > >> >> > >
> > > > >> >> > > 1. Pass to "ProductionExceptionHandler" for any default or
> > > > >> customized
> > > > >> >> > > exception handler to handle
> > > > >> >> > > 2. Stop collecting offset info or new exceptions
> > > > >> >> > > 3. Check and rethrow exceptions in close(), flush() or new
> > send()
> > > > >> >> calls
> > > > >> >> > >
> > > > >> >> > > To my understanding, we should still honor the commitment
> to
> > #1
> > > > for
> > > > >> >> any
> > > > >> >> > > user customized implementation. The #2 does not really
> > affect our
> > > > >> >> > decision
> > > > >> >> > > upon EOS. The #3 here is still valuable as it could help us
> > fail
> > > > >> fast
> > > > >> >> in
> > > > >> >> > > new send() instead of waiting to later stage of processing.
> > In
> > > > that
> > > > >> >> > sense,
> > > > >> >> > > I agree we should continue to record send exceptions even
> > under
> > > > EOS
> > > > >> >> case
> > > > >> >> > to
> > > > >> >> > > ensure the strength of stream side Producer logic. On the
> > safer
> > > > >> side,
> > > > >> >> we
> > > > >> >> > no
> > > > >> >> > > longer need to wrap certain fatal exceptions like
> > ProducerFenced
> > > > as
> > > > >> >> > > TaskMigrated, since they should not crash the stream thread
> > if
> > > > >> thrown
> > > > >> >> in
> > > > >> >> > > raw format, once we adopt the new processing model in the
> > send
> > > > >> phase.
> > > > >> >> > >
> > > > >> >> > >
> > > > >> >> > > >
> > > > >> >> > > > Guozhang
> > > > >> >> > > >
> > > > >> >> > > >
> > > > >> >> > > >
> > > > >> >> > > > On Thu, Dec 17, 2020 at 8:42 PM Boyang Chen <
> > > > >> >> > reluctanthero...@gmail.com>
> > > > >> >> > > > wrote:
> > > > >> >> > > >
> > > > >> >> > > > > Thanks for everyone's feedback so far. I have polished
> > the
> > > > KIP
> > > > >> >> after
> > > > >> >> > > > > offline discussion with some folks working on EOS to
> > make the
> > > > >> >> > exception
> > > > >> >> > > > > handling more lightweight. The essential change is that
> > we
> > > > are
> > > > >> not
> > > > >> >> > > > > inventing a new intermediate exception type, but
> instead
> > > > >> >> separating a
> > > > >> >> > > > full
> > > > >> >> > > > > transaction into two phases:
> > > > >> >> > > > >
> > > > >> >> > > > > 1. The data transmission phase
> > > > >> >> > > > > 2. The commit phase
> > > > >> >> > > > >
> > > > >> >> > > > > This way for any exception thrown from phase 1, will be
> > an
> > > > >> >> indicator
> > > > >> >> > in
> > > > >> >> > > > > phase 2 whether we should do commit or abort, and from
> > now on
> > > > >> >> > > > > `commitTransaction` should only throw fatal exceptions,
> > > > >> similar to
> > > > >> >> > > > > `abortTransaction`, so that any KafkaException caught
> in
> > the
> > > > >> >> commit
> > > > >> >> > > phase
> > > > >> >> > > > > will be definitely fatal to crash the app. For more
> > advanced
> > > > >> users
> > > > >> >> > such
> > > > >> >> > > > as
> > > > >> >> > > > > Streams, we have the ability to further wrap selected
> > types
> > > > of
> > > > >> >> fatal
> > > > >> >> > > > > exceptions to trigger task migration if necessary.
> > > > >> >> > > > >
> > > > >> >> > > > > More details in the KIP, feel free to take another
> look,
> > > > >> thanks!
> > > > >> >> > > > >
> > > > >> >> > > > > On Thu, Dec 17, 2020 at 8:36 PM Boyang Chen <
> > > > >> >> > > reluctanthero...@gmail.com>
> > > > >> >> > > > > wrote:
> > > > >> >> > > > >
> > > > >> >> > > > > > Thanks Bruno for the feedback.
> > > > >> >> > > > > >
> > > > >> >> > > > > > On Mon, Dec 7, 2020 at 5:26 AM Bruno Cadonna <
> > > > >> >> br...@confluent.io>
> > > > >> >> > > > wrote:
> > > > >> >> > > > > >
> > > > >> >> > > > > >> Thanks Boyang for the KIP!
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> Like Matthias, I do also not know the producer
> > internal
> > > > well
> > > > >> >> > enough
> > > > >> >> > > to
> > > > >> >> > > > > >> comment on the categorization. However, I think
> > having a
> > > > >> super
> > > > >> >> > > > exception
> > > > >> >> > > > > >> (e.g. RetriableException) that  encodes if an
> > exception is
> > > > >> >> fatal
> > > > >> >> > or
> > > > >> >> > > > not
> > > > >> >> > > > > >> is cleaner, better understandable and less
> > error-prone,
> > > > >> because
> > > > >> >> > > > ideally
> > > > >> >> > > > > >> when you add a new non-fatal exception in future you
> > just
> > > > >> need
> > > > >> >> to
> > > > >> >> > > > think
> > > > >> >> > > > > >> about letting it inherit from the super exception
> and
> > all
> > > > >> the
> > > > >> >> rest
> > > > >> >> > > of
> > > > >> >> > > > > >> the code will just behave correctly without the need
> > to
> > > > wrap
> > > > >> >> the
> > > > >> >> > new
> > > > >> >> > > > > >> exception into another exception each time it is
> > thrown
> > > > >> (maybe
> > > > >> >> it
> > > > >> >> > is
> > > > >> >> > > > > >> thrown at different location in the code).
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> As far as I understand the following statement from
> > your
> > > > >> >> previous
> > > > >> >> > > > e-mail
> > > > >> >> > > > > >> is the reason that currently such a super exception
> > is not
> > > > >> >> > possible:
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> "Right now we have RetriableException type, if we
> are
> > > > going
> > > > >> to
> > > > >> >> > add a
> > > > >> >> > > > > >> `ProducerRetriableException` type, we have to put
> > this new
> > > > >> >> > interface
> > > > >> >> > > > as
> > > > >> >> > > > > >> the parent of the RetriableException, because not
> all
> > > > thrown
> > > > >> >> > > non-fatal
> > > > >> >> > > > > >> exceptions are `retriable` in general for producer"
> > > > >> >> > > > > >>
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> In the list of exceptions in your KIP, I found
> > non-fatal
> > > > >> >> > exceptions
> > > > >> >> > > > that
> > > > >> >> > > > > >> do not inherit from RetriableException. I guess
> those
> > are
> > > > >> the
> > > > >> >> ones
> > > > >> >> > > you
> > > > >> >> > > > > >> are referring to in your statement:
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> InvalidProducerEpochException
> > > > >> >> > > > > >> InvalidPidMappingException
> > > > >> >> > > > > >> TransactionAbortedException
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> All of those exceptions are non-fatal and do not
> > inherit
> > > > >> from
> > > > >> >> > > > > >> RetriableException. Is there a reason for that? If
> > they
> > > > >> >> depended
> > > > >> >> > > from
> > > > >> >> > > > > >> RetriableException we would be a bit closer to a
> super
> > > > >> >> exception I
> > > > >> >> > > > > >> mention above.
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> The reason is that sender may catch those exceptions
> > in
> > > > the
> > > > >> >> > > > > > ProduceResponse, and it currently does infinite
> > > > >> >> > > > > > retries on RetriableException. To make sure we could
> > > > trigger
> > > > >> the
> > > > >> >> > > > > > abortTransaction() by catching non-fatal thrown
> > > > >> >> > > > > > exceptions and reinitialize the txn state, we chose
> > not to
> > > > >> let
> > > > >> >> > those
> > > > >> >> > > > > > exceptions inherit RetriableException so that
> > > > >> >> > > > > > they won't cause infinite retry on sender.
> > > > >> >> > > > > >
> > > > >> >> > > > > >
> > > > >> >> > > > > >> With OutOfOrderSequenceException and
> > > > >> >> UnknownProducerIdException, I
> > > > >> >> > > > think
> > > > >> >> > > > > >> to understand that their fatality depends on the
> type
> > > > (i.e.
> > > > >> >> > > > > >> configuration) of the producer. That makes it
> > difficult to
> > > > >> >> have a
> > > > >> >> > > > super
> > > > >> >> > > > > >> exception that encodes the retriability as mentioned
> > > > above.
> > > > >> >> Would
> > > > >> >> > it
> > > > >> >> > > > be
> > > > >> >> > > > > >> possible to introduce exceptions that inherit from
> > > > >> >> > > RetriableException
> > > > >> >> > > > > >> and that are thrown when those exceptions are caught
> > from
> > > > >> the
> > > > >> >> > > brokers
> > > > >> >> > > > > >> and the type of the producer is such that the
> > exceptions
> > > > are
> > > > >> >> > > > retriable?
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> Yea, I think in general the exception type mixing is
> > > > >> difficult
> > > > >> >> to
> > > > >> >> > > get
> > > > >> >> > > > > > them all right. I have already proposed another
> > solution
> > > > >> based
> > > > >> >> on
> > > > >> >> > my
> > > > >> >> > > > > > offline discussion with some folks working on EOS
> > > > >> >> > > > > > to make the handling more straightforward for end
> users
> > > > >> without
> > > > >> >> the
> > > > >> >> > > > need
> > > > >> >> > > > > > to distinguish exception fatality.
> > > > >> >> > > > > >
> > > > >> >> > > > > >> Best,
> > > > >> >> > > > > >> Bruno
> > > > >> >> > > > > >>
> > > > >> >> > > > > >>
> > > > >> >> > > > > >> On 04.12.20 19:34, Guozhang Wang wrote:
> > > > >> >> > > > > >> > Thanks Boyang for the proposal! I made a pass over
> > the
> > > > >> list
> > > > >> >> and
> > > > >> >> > > here
> > > > >> >> > > > > are
> > > > >> >> > > > > >> > some thoughts:
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 0) Although this is not part of the public API, I
> > think
> > > > we
> > > > >> >> > should
> > > > >> >> > > > make
> > > > >> >> > > > > >> sure
> > > > >> >> > > > > >> > that the suggested pattern (i.e. user can always
> > call
> > > > >> >> abortTxn()
> > > > >> >> > > > when
> > > > >> >> > > > > >> > handling non-fatal errors) are indeed supported.
> > E.g. if
> > > > >> the
> > > > >> >> txn
> > > > >> >> > > is
> > > > >> >> > > > > >> already
> > > > >> >> > > > > >> > aborted by the broker side, then users can still
> > call
> > > > >> >> abortTxn
> > > > >> >> > > which
> > > > >> >> > > > > >> would
> > > > >> >> > > > > >> > not throw another exception but just be treated
> as a
> > > > >> no-op.
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 1) *ConcurrentTransactionsException*: I think this
> > error
> > > > >> can
> > > > >> >> > also
> > > > >> >> > > be
> > > > >> >> > > > > >> > returned but not documented yet. This should be a
> > > > >> non-fatal
> > > > >> >> > error.
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 2) *InvalidTxnStateException*: this error is
> > returned
> > > > from
> > > > >> >> > broker
> > > > >> >> > > > when
> > > > >> >> > > > > >> txn
> > > > >> >> > > > > >> > state transition failed (e.g. it is trying to
> > transit to
> > > > >> >> > > > > complete-commit
> > > > >> >> > > > > >> > while the current state is not prepare-commit).
> This
> > > > error
> > > > >> >> could
> > > > >> >> > > > > >> indicates
> > > > >> >> > > > > >> > a bug on the client internal code or the broker
> > code,
> > > > OR a
> > > > >> >> user
> > > > >> >> > > > error
> > > > >> >> > > > > >> --- a
> > > > >> >> > > > > >> > similar error is ConcurrentTransactionsException,
> > i.e.
> > > > if
> > > > >> >> Kafka
> > > > >> >> > is
> > > > >> >> > > > > >> bug-free
> > > > >> >> > > > > >> > these exceptions would only be returned if users
> > try to
> > > > do
> > > > >> >> > > something
> > > > >> >> > > > > >> wrong,
> > > > >> >> > > > > >> > e.g. calling abortTxn right after a commitTxn,
> etc.
> > So
> > > > I'm
> > > > >> >> > > thinking
> > > > >> >> > > > it
> > > > >> >> > > > > >> > should be a non-fatal error instead of a fatal
> > error,
> > > > >> wdyt?
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 3) *KafkaException*: case i "indicates fatal
> > > > transactional
> > > > >> >> > > sequence
> > > > >> >> > > > > >> > (Fatal)", this is a bit conflicting with the
> > > > >> >> > > > *OutOfSequenceException*
> > > > >> >> > > > > >> that
> > > > >> >> > > > > >> > is treated as non-fatal. I guess your proposal is
> > that
> > > > >> >> > > > > >> > OutOfOrderSequenceException would be treated
> either
> > as
> > > > >> fatal
> > > > >> >> > with
> > > > >> >> > > > > >> > transactional producer, or non-fatal with
> idempotent
> > > > >> >> producer,
> > > > >> >> > is
> > > > >> >> > > > that
> > > > >> >> > > > > >> > right? If the producer is only configured with
> > > > idempotency
> > > > >> >> but
> > > > >> >> > not
> > > > >> >> > > > > >> > transaction, then throwing a
> > > > >> >> TransactionStateCorruptedException
> > > > >> >> > > for
> > > > >> >> > > > > >> > non-fatal errors would be confusing since users
> are
> > not
> > > > >> using
> > > > >> >> > > > > >> transactions
> > > > >> >> > > > > >> > at all.. So I suggest we always throw
> > > > >> OutOfSequenceException
> > > > >> >> > as-is
> > > > >> >> > > > > (i.e.
> > > > >> >> > > > > >> > not wrapped) no matter how the producer is
> > configured,
> > > > and
> > > > >> >> let
> > > > >> >> > the
> > > > >> >> > > > > >> caller
> > > > >> >> > > > > >> > decide how to handle it based on whether it is
> only
> > > > >> >> idempotent
> > > > >> >> > or
> > > > >> >> > > > > >> > transactional itself.
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 4) Besides all the txn APIs, the `send()`
> callback /
> > > > >> future
> > > > >> >> can
> > > > >> >> > > also
> > > > >> >> > > > > >> throw
> > > > >> >> > > > > >> > txn-related exceptions, I think this KIP should
> also
> > > > cover
> > > > >> >> this
> > > > >> >> > > API
> > > > >> >> > > > as
> > > > >> >> > > > > >> well?
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > 5) This is related to 1/2) above: sometimes those
> > > > >> non-fatal
> > > > >> >> > errors
> > > > >> >> > > > > like
> > > > >> >> > > > > >> > ConcurrentTxn or InvalidTxnState are not due to
> the
> > > > state
> > > > >> >> being
> > > > >> >> > > > > >> corrupted
> > > > >> >> > > > > >> > at the broker side, but maybe users are doing
> > something
> > > > >> >> wrong.
> > > > >> >> > So
> > > > >> >> > > > I'm
> > > > >> >> > > > > >> > wondering if we should further distinguish those
> > > > non-fatal
> > > > >> >> > errors
> > > > >> >> > > > > >> between
> > > > >> >> > > > > >> > a) those that are caused by Kafka itself, e.g. a
> > broker
> > > > >> timed
> > > > >> >> > out
> > > > >> >> > > > and
> > > > >> >> > > > > >> > aborted a txn and later an endTxn request is
> > received,
> > > > >> and b)
> > > > >> >> > the
> > > > >> >> > > > > user's
> > > > >> >> > > > > >> > API call pattern is incorrect, causing the request
> > to be
> > > > >> >> > rejected
> > > > >> >> > > > with
> > > > >> >> > > > > >> an
> > > > >> >> > > > > >> > error code from the broker.
> > > > >> >> *TransactionStateCorruptedException*
> > > > >> >> > > > feels
> > > > >> >> > > > > >> to
> > > > >> >> > > > > >> > me more like for case a), but not case b).
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > Guozhang
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> > On Wed, Dec 2, 2020 at 4:50 PM Boyang Chen <
> > > > >> >> > > > > reluctanthero...@gmail.com>
> > > > >> >> > > > > >> > wrote:
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> >> Thanks Matthias, I think your proposal makes
> sense
> > as
> > > > >> well,
> > > > >> >> on
> > > > >> >> > > the
> > > > >> >> > > > > pro
> > > > >> >> > > > > >> side
> > > > >> >> > > > > >> >> we could have a universally agreed exception type
> > to be
> > > > >> >> caught
> > > > >> >> > by
> > > > >> >> > > > the
> > > > >> >> > > > > >> user,
> > > > >> >> > > > > >> >> without having an extra layer on top of the
> actual
> > > > >> >> exceptions.
> > > > >> >> > I
> > > > >> >> > > > > could
> > > > >> >> > > > > >> see
> > > > >> >> > > > > >> >> some issue on downsides:
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >> 1. The exception hierarchy will be more complex.
> > Right
> > > > >> now
> > > > >> >> we
> > > > >> >> > > have
> > > > >> >> > > > > >> >> RetriableException type, if we are going to add a
> > > > >> >> > > > > >> >> `ProducerRetriableException` type, we have to put
> > this
> > > > >> new
> > > > >> >> > > > interface
> > > > >> >> > > > > >> as the
> > > > >> >> > > > > >> >> parent of the RetriableException, because not all
> > > > thrown
> > > > >> >> > > non-fatal
> > > > >> >> > > > > >> >> exceptions are `retriable` in general for
> > producer, for
> > > > >> >> example
> > > > >> >> > > > > >> >> <
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >>
> > > > >> >> > > > >
> > > > >> >> > > >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >>
> > > > >>
> > > >
> >
> https://github.com/apache/kafka/blob/e275742f850af4a1b79b0d1bd1ac9a1d2e89c64e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L745
> > > > >> >> > > > > >> >>> .
> > > > >> >> > > > > >> >> We could have an empty interface
> > > > >> >> `ProducerRetriableException`
> > > > >> >> > to
> > > > >> >> > > > let
> > > > >> >> > > > > >> all
> > > > >> >> > > > > >> >> the thrown exceptions implement for sure, but
> it's
> > a
> > > > bit
> > > > >> >> > > unorthodox
> > > > >> >> > > > > in
> > > > >> >> > > > > >> the
> > > > >> >> > > > > >> >> way we deal with exceptions in general.
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >> 2. There are cases where we throw a
> KafkaException
> > > > >> wrapping
> > > > >> >> > > another
> > > > >> >> > > > > >> >> KafkaException as either fatal or non-fatal. If
> we
> > use
> > > > an
> > > > >> >> > > interface
> > > > >> >> > > > > to
> > > > >> >> > > > > >> >> solve #1, it is also required to implement
> another
> > > > >> bloated
> > > > >> >> > > > exception
> > > > >> >> > > > > >> class
> > > > >> >> > > > > >> >> which could replace KafkaException type, as we
> > couldn't
> > > > >> mark
> > > > >> >> > > > > >> KafkaException
> > > > >> >> > > > > >> >> as retriable for sure.
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >> 3. In terms of the encapsulation, wrapping means
> we
> > > > could
> > > > >> >> limit
> > > > >> >> > > the
> > > > >> >> > > > > >> scope
> > > > >> >> > > > > >> >> of affection to the producer only, which is
> > important
> > > > >> since
> > > > >> >> we
> > > > >> >> > > > don't
> > > > >> >> > > > > >> want
> > > > >> >> > > > > >> >> shared exception types to implement a
> > producer-related
> > > > >> >> > interface,
> > > > >> >> > > > > such
> > > > >> >> > > > > >> >> as UnknownTopicOrPartitionException.
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >> Best,
> > > > >> >> > > > > >> >> Boyang
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >> On Wed, Dec 2, 2020 at 3:38 PM Matthias J. Sax <
> > > > >> >> > mj...@apache.org
> > > > >> >> > > >
> > > > >> >> > > > > >> wrote:
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >>> Thanks for the KIP Boyang!
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>> Overall, categorizing exceptions makes a lot of
> > sense.
> > > > >> As I
> > > > >> >> > > don't
> > > > >> >> > > > > know
> > > > >> >> > > > > >> >>> the producer internals well enough, I cannot
> > comment
> > > > on
> > > > >> the
> > > > >> >> > > > > >> >>> categorization in detail though.
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>> What I am wondering is, if we should introduce
> an
> > > > >> exception
> > > > >> >> > > > > interface
> > > > >> >> > > > > >> >>> that non-fatal exception would implement instead
> > of
> > > > >> >> creating a
> > > > >> >> > > new
> > > > >> >> > > > > >> class
> > > > >> >> > > > > >> >>> that will wrap non-fatal exceptions? What would
> > be the
> > > > >> >> > pros/cons
> > > > >> >> > > > for
> > > > >> >> > > > > >> >>> both designs?
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>> -Matthias
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>> On 12/2/20 11:35 AM, Boyang Chen wrote:
> > > > >> >> > > > > >> >>>> Hey there,
> > > > >> >> > > > > >> >>>>
> > > > >> >> > > > > >> >>>> I would like to start a discussion thread for
> > > > KIP-691:
> > > > >> >> > > > > >> >>>>
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >>
> > > > >> >> > > > >
> > > > >> >> > > >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >>
> > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
> > > > >> >> > > > > >> >>>>
> > > > >> >> > > > > >> >>>> The KIP is aiming to simplify the exception
> > handling
> > > > >> logic
> > > > >> >> > for
> > > > >> >> > > > > >> >>>> transactional Producer users by classifying
> > fatal and
> > > > >> >> > non-fatal
> > > > >> >> > > > > >> >>> exceptions
> > > > >> >> > > > > >> >>>> and throw them correspondingly for easier catch
> > and
> > > > >> retry.
> > > > >> >> > Let
> > > > >> >> > > me
> > > > >> >> > > > > >> know
> > > > >> >> > > > > >> >>> what
> > > > >> >> > > > > >> >>>> you think.
> > > > >> >> > > > > >> >>>>
> > > > >> >> > > > > >> >>>> Best,
> > > > >> >> > > > > >> >>>> Boyang
> > > > >> >> > > > > >> >>>>
> > > > >> >> > > > > >> >>>
> > > > >> >> > > > > >> >>
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >> >
> > > > >> >> > > > > >>
> > > > >> >> > > > > >
> > > > >> >> > > > >
> > > > >> >> > > >
> > > > >> >> > > >
> > > > >> >> > > > --
> > > > >> >> > > > -- Guozhang
> > > > >> >> > > >
> > > > >> >> > >
> > > > >> >> >
> > > > >> >> >
> > > > >> >> > --
> > > > >> >> > -- Guozhang
> > > > >> >> >
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>

Reply via email to