Thanks for your clarification on 2)/3), that makes sense.

On Tue, Jan 19, 2021 at 10:16 AM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thanks for the input Guozhang, replied inline.
>
> On Mon, Jan 18, 2021 at 8:57 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Boyang,
> >
> > Thanks for the updated KIP. I read it again and have the following
> > thoughts:
> >
> > 0. I'm a bit concerned that if commitTxn does not throw any non-fatal
> > exception, and instead we rely on the subsequent beginTxn call to throw,
> it
> > may violate some callers with a pattern that relying on commitTxn to
> > succeed to make some non-rollback operations. For example:
> >
> > beginTxn()
> > // do some read-write on my local DB
> > commitTxn()
> > // if commitTxn succeeds, then commit the DB
> >
> > -------------
> >
> > The issue is that, committing DB is a non-rollback operation, and users
> may
> > just rely on commitTxn to return without error to make this non-rollback
> > call. Of course we can just claim this pattern is not legitimate and is
> not
> > the right way of doing things, but many users may naturally adopt this
> > pattern.
> >
> > So maybe we should still let commitTxn also throw non-fatal exceptions,
> in
> > which case we would then call abortTxn again.
> >
> > But if we do this, the pattern becomes:
> >
> > try {
> >    beginTxn()
> >    // do something
> > } catch (Exception) {
> >    shouldCommit = false;
> > }
> >
> > if (shouldCommit) {
> >     try {
> >         commitTxn()
> >     } catch (...) {        // enumerate all fatal exceptions
> >         shutdown()
> >     } catch (KafkaException) {
> >         // non-fatal
> >         shouldCommit = false;
> >     }
> > }
> >
> > if (!shouldCommit && running()) {
> > try {
> >         abortTxn()
> >     } catch (KafkaException) {
> >         // only throw fatal
> >         shutdown()
> >     }
> > }
> >
> > ---------------------
> >
> > Which is much more complicated.
> >
> > Thank makes me think, the alternative we have discussed offline may be
> > better: let commitTxn() to return a boolean flag.
> >
> > * If it returns true, it means the commit succeeded. Users can
> comfortably
> > continue and do any external non-rollback operations if they like.
> > * If it returns false, it means the commit failed with non-fatal error
> *AND
> > the txn has been aborted*. Users do not need to call abortTxn again.
> > * If it throws, then it means fatal errors. Users should shut down the
> > client.
> >
> > In this case, the pattern becomes:
> >
> > try {
> >    beginTxn()
> >    // do something
> > } catch (Exception) {
> >    shouldCommit = false;
> > }
> >
> > try {
> >     if (shouldCommit) {
> >         commitSucceeded = commitTxn()
> >     } else {
> >         // reset offsets, rollback operations, etc
> >         abortTxn()
> >     }
> > } catch (KafkaException) {
> >     // only throw fatal
> >     shutdown()
> > }
> >
> > if (commitSucceeded)
> >    // do other non-rollbackable things
> > else
> >    // reset offsets, rollback operations, etc
> >
> > -------------------------
> >
> > Of course, if we want to have better visibility into what caused the
> commit
> > to fail and txn to abort. We can let the return type be an enum instead
> of
> > a flag. But my main idea is to still let the commitTxn be the final point
> > users can tell whether this txn succeeded or not, instead of relying on
> the
> > next beginTxn() call.
> >
> > I agree that adding a boolean flag is indeed useful in this case. Will
> update the KIP.
>
> 1. Re: "while maintaining the behavior to throw fatal exception in raw" not
> > sure what do you mean by "throw" here. Are you proposing the callback
> would
> > *pass* (not throw) in any fatal exceptions when triggered without
> wrapping?
> >
> > Yes, I want to say *pass*, the benefit is to make the end user's
> expectation consistent
> regarding exception handling.
>
>
> > 2. I'm not sure I understand the claim regarding the callback that "In
> EOS
> > setup, it is not required to handle the exception". Are you proposing
> that,
> > e.g. in Streams, we do not try to handle any exceptions if EOS is enabled
> > in the callback anymore, but just let commitTxn() itself to fail to be
> > notified about the problem? Personally I think in Streams we should just
> > make the handling logic of the callback to be consistent regardless of
> the
> > EOS settings (today we have different logic depending on this logic,
> which
> > I think could be unified as well).
> >
> > My idea originates from the claim on send API:
> "When used as part of a transaction, it is not necessary to define a
> callback or check the result of the future  in order to detect errors from
> <code>send</code>. "
> My understanding is that for EOS, the exception will be detected by calling
> transactional APIs either way, so it is a duplicate handling to track
> all the sendExceptions in RecordCollector. However, I looked up
> sendException is being used today as follow:
>
> 1. Pass to "ProductionExceptionHandler" for any default or customized
> exception handler to handle
> 2. Stop collecting offset info or new exceptions
> 3. Check and rethrow exceptions in close(), flush() or new send() calls
>
> To my understanding, we should still honor the commitment to #1 for any
> user customized implementation. The #2 does not really affect our decision
> upon EOS. The #3 here is still valuable as it could help us fail fast in
> new send() instead of waiting to later stage of processing. In that sense,
> I agree we should continue to record send exceptions even under EOS case to
> ensure the strength of stream side Producer logic. On the safer side, we no
> longer need to wrap certain fatal exceptions like ProducerFenced as
> TaskMigrated, since they should not crash the stream thread if thrown in
> raw format, once we adopt the new processing model in the send phase.
>
>
> >
> > Guozhang
> >
> >
> >
> > On Thu, Dec 17, 2020 at 8:42 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Thanks for everyone's feedback so far. I have polished the KIP after
> > > offline discussion with some folks working on EOS to make the exception
> > > handling more lightweight. The essential change is that we are not
> > > inventing a new intermediate exception type, but instead separating a
> > full
> > > transaction into two phases:
> > >
> > > 1. The data transmission phase
> > > 2. The commit phase
> > >
> > > This way for any exception thrown from phase 1, will be an indicator in
> > > phase 2 whether we should do commit or abort, and from now on
> > > `commitTransaction` should only throw fatal exceptions, similar to
> > > `abortTransaction`, so that any KafkaException caught in the commit
> phase
> > > will be definitely fatal to crash the app. For more advanced users such
> > as
> > > Streams, we have the ability to further wrap selected types of fatal
> > > exceptions to trigger task migration if necessary.
> > >
> > > More details in the KIP, feel free to take another look, thanks!
> > >
> > > On Thu, Dec 17, 2020 at 8:36 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Bruno for the feedback.
> > > >
> > > > On Mon, Dec 7, 2020 at 5:26 AM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > > >
> > > >> Thanks Boyang for the KIP!
> > > >>
> > > >> Like Matthias, I do also not know the producer internal well enough
> to
> > > >> comment on the categorization. However, I think having a super
> > exception
> > > >> (e.g. RetriableException) that  encodes if an exception is fatal or
> > not
> > > >> is cleaner, better understandable and less error-prone, because
> > ideally
> > > >> when you add a new non-fatal exception in future you just need to
> > think
> > > >> about letting it inherit from the super exception and all the rest
> of
> > > >> the code will just behave correctly without the need to wrap the new
> > > >> exception into another exception each time it is thrown (maybe it is
> > > >> thrown at different location in the code).
> > > >>
> > > >> As far as I understand the following statement from your previous
> > e-mail
> > > >> is the reason that currently such a super exception is not possible:
> > > >>
> > > >> "Right now we have RetriableException type, if we are going to add a
> > > >> `ProducerRetriableException` type, we have to put this new interface
> > as
> > > >> the parent of the RetriableException, because not all thrown
> non-fatal
> > > >> exceptions are `retriable` in general for producer"
> > > >>
> > > >>
> > > >> In the list of exceptions in your KIP, I found non-fatal exceptions
> > that
> > > >> do not inherit from RetriableException. I guess those are the ones
> you
> > > >> are referring to in your statement:
> > > >>
> > > >> InvalidProducerEpochException
> > > >> InvalidPidMappingException
> > > >> TransactionAbortedException
> > > >>
> > > >> All of those exceptions are non-fatal and do not inherit from
> > > >> RetriableException. Is there a reason for that? If they depended
> from
> > > >> RetriableException we would be a bit closer to a super exception I
> > > >> mention above.
> > > >>
> > > >> The reason is that sender may catch those exceptions in the
> > > > ProduceResponse, and it currently does infinite
> > > > retries on RetriableException. To make sure we could trigger the
> > > > abortTransaction() by catching non-fatal thrown
> > > > exceptions and reinitialize the txn state, we chose not to let those
> > > > exceptions inherit RetriableException so that
> > > > they won't cause infinite retry on sender.
> > > >
> > > >
> > > >> With OutOfOrderSequenceException and UnknownProducerIdException, I
> > think
> > > >> to understand that their fatality depends on the type (i.e.
> > > >> configuration) of the producer. That makes it difficult to have a
> > super
> > > >> exception that encodes the retriability as mentioned above. Would it
> > be
> > > >> possible to introduce exceptions that inherit from
> RetriableException
> > > >> and that are thrown when those exceptions are caught from the
> brokers
> > > >> and the type of the producer is such that the exceptions are
> > retriable?
> > > >>
> > > >> Yea, I think in general the exception type mixing is difficult to
> get
> > > > them all right. I have already proposed another solution based on my
> > > > offline discussion with some folks working on EOS
> > > > to make the handling more straightforward for end users without the
> > need
> > > > to distinguish exception fatality.
> > > >
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >>
> > > >> On 04.12.20 19:34, Guozhang Wang wrote:
> > > >> > Thanks Boyang for the proposal! I made a pass over the list and
> here
> > > are
> > > >> > some thoughts:
> > > >> >
> > > >> > 0) Although this is not part of the public API, I think we should
> > make
> > > >> sure
> > > >> > that the suggested pattern (i.e. user can always call abortTxn()
> > when
> > > >> > handling non-fatal errors) are indeed supported. E.g. if the txn
> is
> > > >> already
> > > >> > aborted by the broker side, then users can still call abortTxn
> which
> > > >> would
> > > >> > not throw another exception but just be treated as a no-op.
> > > >> >
> > > >> > 1) *ConcurrentTransactionsException*: I think this error can also
> be
> > > >> > returned but not documented yet. This should be a non-fatal error.
> > > >> >
> > > >> > 2) *InvalidTxnStateException*: this error is returned from broker
> > when
> > > >> txn
> > > >> > state transition failed (e.g. it is trying to transit to
> > > complete-commit
> > > >> > while the current state is not prepare-commit). This error could
> > > >> indicates
> > > >> > a bug on the client internal code or the broker code, OR a user
> > error
> > > >> --- a
> > > >> > similar error is ConcurrentTransactionsException, i.e. if Kafka is
> > > >> bug-free
> > > >> > these exceptions would only be returned if users try to do
> something
> > > >> wrong,
> > > >> > e.g. calling abortTxn right after a commitTxn, etc. So I'm
> thinking
> > it
> > > >> > should be a non-fatal error instead of a fatal error, wdyt?
> > > >> >
> > > >> > 3) *KafkaException*: case i "indicates fatal transactional
> sequence
> > > >> > (Fatal)", this is a bit conflicting with the
> > *OutOfSequenceException*
> > > >> that
> > > >> > is treated as non-fatal. I guess your proposal is that
> > > >> > OutOfOrderSequenceException would be treated either as fatal with
> > > >> > transactional producer, or non-fatal with idempotent producer, is
> > that
> > > >> > right? If the producer is only configured with idempotency but not
> > > >> > transaction, then throwing a TransactionStateCorruptedException
> for
> > > >> > non-fatal errors would be confusing since users are not using
> > > >> transactions
> > > >> > at all.. So I suggest we always throw OutOfSequenceException as-is
> > > (i.e.
> > > >> > not wrapped) no matter how the producer is configured, and let the
> > > >> caller
> > > >> > decide how to handle it based on whether it is only idempotent or
> > > >> > transactional itself.
> > > >> >
> > > >> > 4) Besides all the txn APIs, the `send()` callback / future can
> also
> > > >> throw
> > > >> > txn-related exceptions, I think this KIP should also cover this
> API
> > as
> > > >> well?
> > > >> >
> > > >> > 5) This is related to 1/2) above: sometimes those non-fatal errors
> > > like
> > > >> > ConcurrentTxn or InvalidTxnState are not due to the state being
> > > >> corrupted
> > > >> > at the broker side, but maybe users are doing something wrong. So
> > I'm
> > > >> > wondering if we should further distinguish those non-fatal errors
> > > >> between
> > > >> > a) those that are caused by Kafka itself, e.g. a broker timed out
> > and
> > > >> > aborted a txn and later an endTxn request is received, and b) the
> > > user's
> > > >> > API call pattern is incorrect, causing the request to be rejected
> > with
> > > >> an
> > > >> > error code from the broker. *TransactionStateCorruptedException*
> > feels
> > > >> to
> > > >> > me more like for case a), but not case b).
> > > >> >
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Wed, Dec 2, 2020 at 4:50 PM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> >> Thanks Matthias, I think your proposal makes sense as well, on
> the
> > > pro
> > > >> side
> > > >> >> we could have a universally agreed exception type to be caught by
> > the
> > > >> user,
> > > >> >> without having an extra layer on top of the actual exceptions. I
> > > could
> > > >> see
> > > >> >> some issue on downsides:
> > > >> >>
> > > >> >> 1. The exception hierarchy will be more complex. Right now we
> have
> > > >> >> RetriableException type, if we are going to add a
> > > >> >> `ProducerRetriableException` type, we have to put this new
> > interface
> > > >> as the
> > > >> >> parent of the RetriableException, because not all thrown
> non-fatal
> > > >> >> exceptions are `retriable` in general for producer, for example
> > > >> >> <
> > > >> >>
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/e275742f850af4a1b79b0d1bd1ac9a1d2e89c64e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L745
> > > >> >>> .
> > > >> >> We could have an empty interface `ProducerRetriableException` to
> > let
> > > >> all
> > > >> >> the thrown exceptions implement for sure, but it's a bit
> unorthodox
> > > in
> > > >> the
> > > >> >> way we deal with exceptions in general.
> > > >> >>
> > > >> >> 2. There are cases where we throw a KafkaException wrapping
> another
> > > >> >> KafkaException as either fatal or non-fatal. If we use an
> interface
> > > to
> > > >> >> solve #1, it is also required to implement another bloated
> > exception
> > > >> class
> > > >> >> which could replace KafkaException type, as we couldn't mark
> > > >> KafkaException
> > > >> >> as retriable for sure.
> > > >> >>
> > > >> >> 3. In terms of the encapsulation, wrapping means we could limit
> the
> > > >> scope
> > > >> >> of affection to the producer only, which is important since we
> > don't
> > > >> want
> > > >> >> shared exception types to implement a producer-related interface,
> > > such
> > > >> >> as UnknownTopicOrPartitionException.
> > > >> >>
> > > >> >> Best,
> > > >> >> Boyang
> > > >> >>
> > > >> >> On Wed, Dec 2, 2020 at 3:38 PM Matthias J. Sax <mj...@apache.org
> >
> > > >> wrote:
> > > >> >>
> > > >> >>> Thanks for the KIP Boyang!
> > > >> >>>
> > > >> >>> Overall, categorizing exceptions makes a lot of sense. As I
> don't
> > > know
> > > >> >>> the producer internals well enough, I cannot comment on the
> > > >> >>> categorization in detail though.
> > > >> >>>
> > > >> >>> What I am wondering is, if we should introduce an exception
> > > interface
> > > >> >>> that non-fatal exception would implement instead of creating a
> new
> > > >> class
> > > >> >>> that will wrap non-fatal exceptions? What would be the pros/cons
> > for
> > > >> >>> both designs?
> > > >> >>>
> > > >> >>>
> > > >> >>> -Matthias
> > > >> >>>
> > > >> >>>
> > > >> >>> On 12/2/20 11:35 AM, Boyang Chen wrote:
> > > >> >>>> Hey there,
> > > >> >>>>
> > > >> >>>> I would like to start a discussion thread for KIP-691:
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
> > > >> >>>>
> > > >> >>>> The KIP is aiming to simplify the exception handling logic for
> > > >> >>>> transactional Producer users by classifying fatal and non-fatal
> > > >> >>> exceptions
> > > >> >>>> and throw them correspondingly for easier catch and retry. Let
> me
> > > >> know
> > > >> >>> what
> > > >> >>>> you think.
> > > >> >>>>
> > > >> >>>> Best,
> > > >> >>>> Boyang
> > > >> >>>>
> > > >> >>>
> > > >> >>
> > > >> >
> > > >> >
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to