Hi Justine,

Thanks for the proposal.

I was thinking about the implementation a little bit. In the current
proposal, the behavior depends on whether we have an old or new client. For
old clients, we send `DescribeTransactions` and verify the result and for
new clients, we send `AddPartitionsToTxn`. We might be able to simplify the
implementation if we can use the same request type. For example, what if we
bump the protocol version for `AddPartitionsToTxn` and add a `validateOnly`
flag? For older versions, we can set `validateOnly=true` so that the
request only returns successfully if the partition had already been added.
For new versions, we can set `validateOnly=false` and the partition will be
added to the transaction. The other slightly annoying thing that this would
get around is the need to collect the transaction state for all partitions
even when we only care about a subset.

Some additional improvements to consider:

- We can give `AddPartitionsToTxn` better batch support for inter-broker
usage. Currently we only allow one `TransactionalId` to be specified, but
the broker may get some benefit being able to batch across multiple
transactions.
- Another small improvement is skipping topic authorization checks for
`AddPartitionsToTxn` when the request is from a broker. Perhaps we can add
a field for the `LeaderId` or something like that and require CLUSTER
permission when set.

Best,
Jason



On Mon, Dec 19, 2022 at 3:56 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Justine,
>
> Thanks for the explanation. It makes sense to me now.
>
> Jun
>
> On Mon, Dec 19, 2022 at 1:42 PM Justine Olshan
> <jols...@confluent.io.invalid>
> wrote:
>
> > Hi Jun,
> >
> > My understanding of the mechanism is that when we get to the last epoch,
> we
> > increment to the fencing/last epoch and if any further requests come in
> for
> > this producer ID they are fenced. Then the producer gets a new ID and
> > restarts with epoch/sequence 0. The fenced epoch sticks around for the
> > duration of producer.id.expiration.ms and blocks any late messages
> there.
> > The new ID will get to take advantage of the improved semantics around
> > non-zero start sequences. So I think we are covered.
> >
> > The only potential issue is overloading the cache, but hopefully the
> > improvements (lowered producer.id.expiration.ms) will help with that.
> Let
> > me know if you still have concerns.
> >
> > Thanks,
> > Justine
> >
> > On Mon, Dec 19, 2022 at 10:24 AM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Justine,
> > >
> > > Thanks for the explanation.
> > >
> > > 70. The proposed fencing logic doesn't apply when pid changes, is that
> > > right? If so, I am not sure how complete we are addressing this issue
> if
> > > the pid changes more frequently.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Fri, Dec 16, 2022 at 9:16 AM Justine Olshan
> > > <jols...@confluent.io.invalid>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for replying!
> > > >
> > > > 70.We already do the overflow mechanism, so my change would just make
> > it
> > > > happen more often.
> > > > I was also not suggesting a new field in the log, but in the
> response,
> > > > which would be gated by the client version. Sorry if something there
> is
> > > > unclear. I think we are starting to diverge.
> > > > The goal of this KIP is to not change to the marker format at all.
> > > >
> > > > 71. Yes, I guess I was going under the assumption that the log would
> > just
> > > > look at its last epoch and treat it as the current epoch. I suppose
> we
> > > can
> > > > have some special logic that if the last epoch was on a marker we
> > > actually
> > > > expect the next epoch or something like that. We just need to
> > distinguish
> > > > based on whether we had a commit/abort marker.
> > > >
> > > > 72.
> > > > > if the producer epoch hasn't been bumped on the
> > > > broker, it seems that the stucked message will fail the sequence
> > > validation
> > > > and will be ignored. If the producer epoch has been bumped, we ignore
> > the
> > > > sequence check and the stuck message could be appended to the log.
> So,
> > is
> > > > the latter case that we want to guard?
> > > >
> > > > I'm not sure I follow that "the message will fail the sequence
> > > validation".
> > > > In some of these cases, we had an abort marker (due to an error) and
> > then
> > > > the late message comes in with the correct sequence number. This is a
> > > case
> > > > covered by the KIP.
> > > > The latter case is actually not something we've considered here. I
> > think
> > > > generally when we bump the epoch, we are accepting that the sequence
> > does
> > > > not need to be checked anymore. My understanding is also that we
> don't
> > > > typically bump epoch mid transaction (based on a quick look at the
> > code)
> > > > but let me know if that is the case.
> > > >
> > > > Thanks,
> > > > Justine
> > > >
> > > > On Thu, Dec 15, 2022 at 12:23 PM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Justine,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 70. Assigning a new pid on int overflow seems a bit hacky. If we
> > need a
> > > > txn
> > > > > level id, it will be better to model this explicitly. Adding a new
> > > field
> > > > > would require a bit more work since it requires a new txn marker
> > format
> > > > in
> > > > > the log. So, we probably need to guard it with an IBP or metadata
> > > version
> > > > > and document the impact on downgrade once the new format is written
> > to
> > > > the
> > > > > log.
> > > > >
> > > > > 71. Hmm, once the marker is written, the partition will expect the
> > next
> > > > > append to be on the next epoch. Does that cover the case you
> > mentioned?
> > > > >
> > > > > 72. Also, just to be clear on the stucked message issue described
> in
> > > the
> > > > > motivation. With EoS, we also validate the sequence id for
> > idempotency.
> > > > So,
> > > > > with the current logic, if the producer epoch hasn't been bumped on
> > the
> > > > > broker, it seems that the stucked message will fail the sequence
> > > > validation
> > > > > and will be ignored. If the producer epoch has been bumped, we
> ignore
> > > the
> > > > > sequence check and the stuck message could be appended to the log.
> > So,
> > > is
> > > > > the latter case that we want to guard?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Dec 14, 2022 at 10:44 AM Justine Olshan
> > > > > <jols...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Matthias — thanks again for taking time to look a this. You said:
> > > > > >
> > > > > > > My proposal was only focusing to avoid dangling
> > > > > >
> > > > > > transactions if records are added without registered partition.
> --
> > > > Maybe
> > > > > >
> > > > > > you can add a few more details to the KIP about this scenario for
> > > > better
> > > > > >
> > > > > > documentation purpose?
> > > > > >
> > > > > >
> > > > > > I'm not sure I understand what you mean here. The motivation
> > section
> > > > > > describes two scenarios about how the record can be added
> without a
> > > > > > registered partition:
> > > > > >
> > > > > >
> > > > > > > This can happen when a message gets stuck or delayed due to
> > > > networking
> > > > > > issues or a network partition, the transaction aborts, and then
> the
> > > > > delayed
> > > > > > message finally comes in.
> > > > > >
> > > > > >
> > > > > > > Another way hanging transactions can occur is that a client is
> > > buggy
> > > > > and
> > > > > > may somehow try to write to a partition before it adds the
> > partition
> > > to
> > > > > the
> > > > > > transaction.
> > > > > >
> > > > > >
> > > > > >
> > > > > > For the first example of this would it be helpful to say that
> this
> > > > > message
> > > > > > comes in after the abort, but before the partition is added to
> the
> > > next
> > > > > > transaction so it becomes "hanging." Perhaps the next sentence
> > > > describing
> > > > > > the message becoming part of the next transaction (a different
> > case)
> > > > was
> > > > > > not properly differentiated.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Jun — thanks for reading the KIP.
> > > > > >
> > > > > > 70. The int typing was a concern. Currently we have a mechanism
> in
> > > > place
> > > > > to
> > > > > > fence the final epoch when the epoch is about to overflow and
> > assign
> > > a
> > > > > new
> > > > > > producer ID with epoch 0. Of course, this is a bit tricky when it
> > > comes
> > > > > to
> > > > > > the response back to the client.
> > > > > > Making this a long could be another option, but I wonder are
> there
> > > any
> > > > > > implications on changing this field if the epoch is persisted to
> > > disk?
> > > > > I'd
> > > > > > need to check the usages.
> > > > > >
> > > > > > 71.This was something Matthias asked about as well. I was
> > > considering a
> > > > > > possible edge case where a produce request from a new transaction
> > > > somehow
> > > > > > gets sent right after the marker is written, but before the
> > producer
> > > is
> > > > > > alerted of the newly bumped epoch. In this case, we may include
> > this
> > > > > record
> > > > > > when we don't want to. I suppose we could try to do something
> > client
> > > > side
> > > > > > to bump the epoch after sending an endTxn as well in this
> scenario
> > —
> > > > but
> > > > > I
> > > > > > wonder how it would work when the server is aborting based on a
> > > > > server-side
> > > > > > error. I could also be missing something and this scenario is
> > > actually
> > > > > not
> > > > > > possible.
> > > > > >
> > > > > > Thanks again to everyone reading and commenting. Let me know
> about
> > > any
> > > > > > further questions or comments.
> > > > > >
> > > > > > Justine
> > > > > >
> > > > > > On Wed, Dec 14, 2022 at 9:41 AM Jun Rao <j...@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Justine,
> > > > > > >
> > > > > > > Thanks for the KIP. A couple of comments.
> > > > > > >
> > > > > > > 70. Currently, the producer epoch is an int. I am not sure if
> > it's
> > > > > enough
> > > > > > > to accommodate all transactions in the lifetime of a producer.
> > > Should
> > > > > we
> > > > > > > change that to a long or add a new long field like txnId?
> > > > > > >
> > > > > > > 71. "it will write the prepare commit message with a bumped
> epoch
> > > and
> > > > > > send
> > > > > > > WriteTxnMarkerRequests with the bumped epoch." Hmm, the epoch
> is
> > > > > > associated
> > > > > > > with the current txn right? So, it seems weird to write a
> commit
> > > > > message
> > > > > > > with a bumped epoch. Should we only bump up the epoch in
> > > > EndTxnResponse
> > > > > > and
> > > > > > > rename the field to sth like nextProducerEpoch?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 12, 2022 at 8:54 PM Matthias J. Sax <
> > mj...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the background.
> > > > > > > >
> > > > > > > > 20/30: SGTM. My proposal was only focusing to avoid dangling
> > > > > > > > transactions if records are added without registered
> partition.
> > > --
> > > > > > Maybe
> > > > > > > > you can add a few more details to the KIP about this scenario
> > for
> > > > > > better
> > > > > > > > documentation purpose?
> > > > > > > >
> > > > > > > > 40: I think you hit a fair point about race conditions or
> > client
> > > > bugs
> > > > > > > > (incorrectly not bumping the epoch). The complexity/confusion
> > for
> > > > > using
> > > > > > > > the bumped epoch I see, is mainly for internal debugging, ie,
> > > > > > inspecting
> > > > > > > > log segment dumps -- it seems harder to reason about the
> system
> > > for
> > > > > us
> > > > > > > > humans. But if we get better guarantees, it would be worth to
> > use
> > > > the
> > > > > > > > bumped epoch.
> > > > > > > >
> > > > > > > > 60: as I mentioned already, I don't know the broker internals
> > to
> > > > > > provide
> > > > > > > > more input. So if nobody else chimes in, we should just move
> > > > forward
> > > > > > > > with your proposal.
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > >
> > > > > > > > On 12/6/22 4:22 PM, Justine Olshan wrote:
> > > > > > > > > Hi all,
> > > > > > > > > After Artem's questions about error behavior, I've
> > re-evaluated
> > > > the
> > > > > > > > > unknown producer ID exception and had some discussions
> > offline.
> > > > > > > > >
> > > > > > > > > I think generally it makes sense to simplify error handling
> > in
> > > > > cases
> > > > > > > like
> > > > > > > > > this and the UNKNOWN_PRODUCER_ID error has a pretty long
> and
> > > > > > > complicated
> > > > > > > > > history. Because of this, I propose adding a new error code
> > > > > > > > ABORTABLE_ERROR
> > > > > > > > > that when encountered by new clients (gated by the produce
> > > > request
> > > > > > > > version)
> > > > > > > > > will simply abort the transaction. This allows the server
> to
> > > have
> > > > > > some
> > > > > > > > say
> > > > > > > > > in whether the client aborts and makes handling much
> simpler.
> > > In
> > > > > the
> > > > > > > > > future, we can also use this error in other situations
> where
> > we
> > > > > want
> > > > > > to
> > > > > > > > > abort the transactions. We can even use on other apis.
> > > > > > > > >
> > > > > > > > > I've added this to the KIP. Let me know if there are any
> > > > questions
> > > > > or
> > > > > > > > > issues.
> > > > > > > > >
> > > > > > > > > Justine
> > > > > > > > >
> > > > > > > > > On Fri, Dec 2, 2022 at 10:22 AM Justine Olshan <
> > > > > jols...@confluent.io
> > > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hey Matthias,
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 20/30 — Maybe I also didn't express myself clearly. For
> > older
> > > > > > clients
> > > > > > > we
> > > > > > > > >> don't have a way to distinguish between a previous and the
> > > > current
> > > > > > > > >> transaction since we don't have the epoch bump. This means
> > > that
> > > > a
> > > > > > late
> > > > > > > > >> message from the previous transaction may be added to the
> > new
> > > > one.
> > > > > > > With
> > > > > > > > >> older clients — we can't guarantee this won't happen if we
> > > > already
> > > > > > > sent
> > > > > > > > the
> > > > > > > > >> addPartitionsToTxn call (why we make changes for the newer
> > > > client)
> > > > > > but
> > > > > > > > we
> > > > > > > > >> can at least gate some by ensuring that the partition has
> > been
> > > > > added
> > > > > > > to
> > > > > > > > the
> > > > > > > > >> transaction. The rationale here is that there are likely
> > LESS
> > > > late
> > > > > > > > arrivals
> > > > > > > > >> as time goes on, so hopefully most late arrivals will come
> > in
> > > > > BEFORE
> > > > > > > the
> > > > > > > > >> addPartitionsToTxn call. Those that arrive before will be
> > > > properly
> > > > > > > gated
> > > > > > > > >> with the describeTransactions approach.
> > > > > > > > >>
> > > > > > > > >> If we take the approach you suggested, ANY late arrival
> > from a
> > > > > > > previous
> > > > > > > > >> transaction will be added. And we don't want that. I also
> > > don't
> > > > > see
> > > > > > > any
> > > > > > > > >> benefit in sending addPartitionsToTxn over the
> describeTxns
> > > > call.
> > > > > > They
> > > > > > > > will
> > > > > > > > >> both be one extra RPC to the Txn coordinator.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> To be clear — newer clients will use addPartitionsToTxn
> > > instead
> > > > of
> > > > > > the
> > > > > > > > >> DescribeTxns.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 40)
> > > > > > > > >> My concern is that if we have some delay in the client to
> > bump
> > > > the
> > > > > > > > epoch,
> > > > > > > > >> it could continue to send epoch 73 and those records would
> > not
> > > > be
> > > > > > > > fenced.
> > > > > > > > >> Perhaps this is not an issue if we don't allow the next
> > > produce
> > > > to
> > > > > > go
> > > > > > > > >> through before the EndTxn request returns. I'm also
> thinking
> > > > about
> > > > > > > > cases of
> > > > > > > > >> failure. I will need to think on this a bit.
> > > > > > > > >>
> > > > > > > > >> I wasn't sure if it was that confusing. But if we think it
> > is,
> > > > we
> > > > > > can
> > > > > > > > >> investigate other ways.
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> 60)
> > > > > > > > >>
> > > > > > > > >> I'm not sure these are the same purgatories since one is a
> > > > produce
> > > > > > > > >> purgatory (I was planning on using a callback rather than
> > > > > purgatory)
> > > > > > > and
> > > > > > > > >> the other is simply a request to append to the log. Not
> sure
> > > we
> > > > > have
> > > > > > > any
> > > > > > > > >> structure here for ordering, but my understanding is that
> > the
> > > > > broker
> > > > > > > > could
> > > > > > > > >> handle the write request before it hears back from the Txn
> > > > > > > Coordinator.
> > > > > > > > >>
> > > > > > > > >> Let me know if I misunderstood something or something was
> > > > unclear.
> > > > > > > > >>
> > > > > > > > >> Justine
> > > > > > > > >>
> > > > > > > > >> On Thu, Dec 1, 2022 at 12:15 PM Matthias J. Sax <
> > > > mj...@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> Thanks for the details Justine!
> > > > > > > > >>>
> > > > > > > > >>>> 20)
> > > > > > > > >>>>
> > > > > > > > >>>> The client side change for 2 is removing the
> addPartitions
> > > to
> > > > > > > > >>> transaction
> > > > > > > > >>>> call. We don't need to make this from the producer to
> the
> > > txn
> > > > > > > > >>> coordinator,
> > > > > > > > >>>> only server side.
> > > > > > > > >>>
> > > > > > > > >>> I think I did not express myself clearly. I understand
> that
> > > we
> > > > > can
> > > > > > > (and
> > > > > > > > >>> should) change the producer to not send the
> `addPartitions`
> > > > > request
> > > > > > > any
> > > > > > > > >>> longer. But I don't thinks it's requirement to change the
> > > > broker?
> > > > > > > > >>>
> > > > > > > > >>> What I am trying to say is: as a safe-guard and
> improvement
> > > for
> > > > > > older
> > > > > > > > >>> producers, the partition leader can just send the
> > > > `addPartitions`
> > > > > > > > >>> request to the TX-coordinator in any case -- if the old
> > > > producer
> > > > > > > > >>> correctly did send the `addPartition` request to the
> > > > > TX-coordinator
> > > > > > > > >>> already, the TX-coordinator can just "ignore" is as
> > > idempotent.
> > > > > > > > However,
> > > > > > > > >>> if the old producer has a bug and did forget to sent the
> > > > > > > `addPartition`
> > > > > > > > >>> request, we would now ensure that the partition is indeed
> > > added
> > > > > to
> > > > > > > the
> > > > > > > > >>> TX and thus fix a potential producer bug (even if we
> don't
> > > get
> > > > > the
> > > > > > > > >>> fencing via the bump epoch). -- It seems to be a good
> > > > > improvement?
> > > > > > Or
> > > > > > > > is
> > > > > > > > >>> there a reason to not do this?
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>> 30)
> > > > > > > > >>>>
> > > > > > > > >>>> Transaction is ongoing = partition was added to
> > transaction
> > > > via
> > > > > > > > >>>> addPartitionsToTxn. We check this with the
> > > > DescribeTransactions
> > > > > > > call.
> > > > > > > > >>> Let
> > > > > > > > >>>> me know if this wasn't sufficiently explained here:
> > > > > > > > >>>
> > > > > > > > >>> If we do what I propose in (20), we don't really need to
> > make
> > > > > this
> > > > > > > > >>> `DescribeTransaction` call, as the partition leader adds
> > the
> > > > > > > partition
> > > > > > > > >>> for older clients and we get this check for free.
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>> 40)
> > > > > > > > >>>>
> > > > > > > > >>>> The idea here is that if any messages somehow come in
> > before
> > > > we
> > > > > > get
> > > > > > > > the
> > > > > > > > >>> new
> > > > > > > > >>>> epoch to the producer, they will be fenced. However, if
> we
> > > > don't
> > > > > > > think
> > > > > > > > >>> this
> > > > > > > > >>>> is necessary, it can be discussed
> > > > > > > > >>>
> > > > > > > > >>> I agree that we should have epoch fencing. My question is
> > > > > > different:
> > > > > > > > >>> Assume we are at epoch 73, and we have an ongoing
> > > transaction,
> > > > > that
> > > > > > > is
> > > > > > > > >>> committed. It seems natural to write the "prepare commit"
> > > > marker
> > > > > > and
> > > > > > > > the
> > > > > > > > >>> `WriteTxMarkerRequest` both with epoch 73, too, as it
> > belongs
> > > > to
> > > > > > the
> > > > > > > > >>> current transaction. Of course, we now also bump the
> epoch
> > > and
> > > > > > expect
> > > > > > > > >>> the next requests to have epoch 74, and would reject an
> > > request
> > > > > > with
> > > > > > > > >>> epoch 73, as the corresponding TX for epoch 73 was
> already
> > > > > > committed.
> > > > > > > > >>>
> > > > > > > > >>> It seems you propose to write the "prepare commit marker"
> > and
> > > > > > > > >>> `WriteTxMarkerRequest` with epoch 74 though, what would
> > work,
> > > > but
> > > > > > it
> > > > > > > > >>> seems confusing. Is there a reason why we would use the
> > > bumped
> > > > > > epoch
> > > > > > > 74
> > > > > > > > >>> instead of the current epoch 73?
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>> 60)
> > > > > > > > >>>>
> > > > > > > > >>>> When we are checking if the transaction is ongoing, we
> > need
> > > to
> > > > > > make
> > > > > > > a
> > > > > > > > >>> round
> > > > > > > > >>>> trip from the leader partition to the transaction
> > > coordinator.
> > > > > In
> > > > > > > the
> > > > > > > > >>> time
> > > > > > > > >>>> we are waiting for this message to come back, in theory
> we
> > > > could
> > > > > > > have
> > > > > > > > >>> sent
> > > > > > > > >>>> a commit/abort call that would make the original result
> of
> > > the
> > > > > > check
> > > > > > > > >>> out of
> > > > > > > > >>>> date. That is why we can check the leader state before
> we
> > > > write
> > > > > to
> > > > > > > the
> > > > > > > > >>> log.
> > > > > > > > >>>
> > > > > > > > >>> Thanks. Got it.
> > > > > > > > >>>
> > > > > > > > >>> However, is this really an issue? We put the produce
> > request
> > > in
> > > > > > > > >>> purgatory, so how could we process the
> > > `WriteTxnMarkerRequest`
> > > > > > first?
> > > > > > > > >>> Don't we need to put the `WriteTxnMarkerRequest` into
> > > > purgatory,
> > > > > > too,
> > > > > > > > >>> for this case, and process both request in-order? (Again,
> > my
> > > > > broker
> > > > > > > > >>> knowledge is limited and maybe we don't maintain request
> > > order
> > > > > for
> > > > > > > this
> > > > > > > > >>> case, what seems to be an issue IMHO, and I am wondering
> if
> > > > > > changing
> > > > > > > > >>> request handling to preserve order for this case might be
> > the
> > > > > > cleaner
> > > > > > > > >>> solution?)
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> -Matthias
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> On 11/30/22 3:28 PM, Artem Livshits wrote:
> > > > > > > > >>>> Hi Justine,
> > > > > > > > >>>>
> > > > > > > > >>>> I think the interesting part is not in this logic
> (because
> > > it
> > > > > > tries
> > > > > > > to
> > > > > > > > >>>> figure out when UNKNOWN_PRODUCER_ID is retriable and if
> > it's
> > > > > > > > retryable,
> > > > > > > > >>>> it's definitely not fatal), but what happens when this
> > logic
> > > > > > doesn't
> > > > > > > > >>> return
> > > > > > > > >>>> 'true' and falls through.  In the old clients it seems
> to
> > be
> > > > > > fatal,
> > > > > > > if
> > > > > > > > >>> we
> > > > > > > > >>>> keep the behavior in the new clients, I'd expect it
> would
> > be
> > > > > fatal
> > > > > > > as
> > > > > > > > >>> well.
> > > > > > > > >>>>
> > > > > > > > >>>> -Artem
> > > > > > > > >>>>
> > > > > > > > >>>> On Tue, Nov 29, 2022 at 11:57 AM Justine Olshan
> > > > > > > > >>>> <jols...@confluent.io.invalid> wrote:
> > > > > > > > >>>>
> > > > > > > > >>>>> Hi Artem and Jeff,
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thanks for taking a look and sorry for the slow
> response.
> > > > > > > > >>>>>
> > > > > > > > >>>>> You both mentioned the change to handle
> > UNKNOWN_PRODUCER_ID
> > > > > > errors.
> > > > > > > > To
> > > > > > > > >>> be
> > > > > > > > >>>>> clear — this error code will only be sent again when
> the
> > > > > client's
> > > > > > > > >>> request
> > > > > > > > >>>>> version is high enough to ensure we handle it
> correctly.
> > > > > > > > >>>>> The current (Java) client handles this by the following
> > > > > (somewhat
> > > > > > > > long)
> > > > > > > > >>>>> code snippet:
> > > > > > > > >>>>>
> > > > > > > > >>>>> // An UNKNOWN_PRODUCER_ID means that we have lost the
> > > > producer
> > > > > > > state
> > > > > > > > >>> on the
> > > > > > > > >>>>> broker. Depending on the log start
> > > > > > > > >>>>>
> > > > > > > > >>>>> // offset, we may want to retry these, as described for
> > > each
> > > > > case
> > > > > > > > >>> below. If
> > > > > > > > >>>>> none of those apply, then for the
> > > > > > > > >>>>>
> > > > > > > > >>>>> // idempotent producer, we will locally bump the epoch
> > and
> > > > > reset
> > > > > > > the
> > > > > > > > >>>>> sequence numbers of in-flight batches from
> > > > > > > > >>>>>
> > > > > > > > >>>>> // sequence 0, then retry the failed batch, which
> should
> > > now
> > > > > > > succeed.
> > > > > > > > >>> For
> > > > > > > > >>>>> the transactional producer, allow the
> > > > > > > > >>>>>
> > > > > > > > >>>>> // batch to fail. When processing the failed batch, we
> > will
> > > > > > > > transition
> > > > > > > > >>> to
> > > > > > > > >>>>> an abortable error and set a flag
> > > > > > > > >>>>>
> > > > > > > > >>>>> // indicating that we need to bump the epoch (if
> > supported
> > > by
> > > > > the
> > > > > > > > >>> broker).
> > > > > > > > >>>>>
> > > > > > > > >>>>> if (error == Errors.*UNKNOWN_PRODUCER_ID*) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>       if (response.logStartOffset == -1) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // We don't know the log start offset with
> this
> > > > > > response.
> > > > > > > > We
> > > > > > > > >>> should
> > > > > > > > >>>>> just retry the request until we get it.
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // The UNKNOWN_PRODUCER_ID error code was
> added
> > > > along
> > > > > > > with
> > > > > > > > >>> the new
> > > > > > > > >>>>> ProduceResponse which includes the
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // logStartOffset. So the '-1' sentinel is
> not
> > > for
> > > > > > > backward
> > > > > > > > >>>>> compatibility. Instead, it is possible for
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // a broker to not know the logStartOffset at
> > > when
> > > > it
> > > > > > is
> > > > > > > > >>> returning
> > > > > > > > >>>>> the response because the partition
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // may have moved away from the broker from
> the
> > > > time
> > > > > > the
> > > > > > > > >>> error was
> > > > > > > > >>>>> initially raised to the time the
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // response was being constructed. In these
> > > cases,
> > > > we
> > > > > > > > should
> > > > > > > > >>> just
> > > > > > > > >>>>> retry the request: we are guaranteed
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // to eventually get a logStartOffset once
> > things
> > > > > > settle
> > > > > > > > down.
> > > > > > > > >>>>>
> > > > > > > > >>>>>           return true;
> > > > > > > > >>>>>
> > > > > > > > >>>>>       }
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>       if (batch.sequenceHasBeenReset()) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // When the first inflight batch fails due to
> > the
> > > > > > > > truncation
> > > > > > > > >>> case,
> > > > > > > > >>>>> then the sequences of all the other
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // in flight batches would have been
> restarted
> > > from
> > > > > the
> > > > > > > > >>> beginning.
> > > > > > > > >>>>> However, when those responses
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // come back from the broker, they would also
> > > come
> > > > > with
> > > > > > > an
> > > > > > > > >>>>> UNKNOWN_PRODUCER_ID error. In this case, we should not
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // reset the sequence numbers to the
> beginning.
> > > > > > > > >>>>>
> > > > > > > > >>>>>           return true;
> > > > > > > > >>>>>
> > > > > > > > >>>>>       } else if
> > > > (lastAckedOffset(batch.topicPartition).orElse(
> > > > > > > > >>>>> *NO_LAST_ACKED_SEQUENCE_NUMBER*) <
> > > response.logStartOffset) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // The head of the log has been removed,
> > probably
> > > > due
> > > > > > to
> > > > > > > > the
> > > > > > > > >>>>> retention time elapsing. In this case,
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // we expect to lose the producer state. For
> > the
> > > > > > > > transactional
> > > > > > > > >>>>> producer, reset the sequences of all
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // inflight batches to be from the beginning
> > and
> > > > > retry
> > > > > > > > them,
> > > > > > > > >>> so
> > > > > > > > >>>>> that the transaction does not need to
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // be aborted. For the idempotent producer,
> > bump
> > > > the
> > > > > > > epoch
> > > > > > > > to
> > > > > > > > >>> avoid
> > > > > > > > >>>>> reusing (sequence, epoch) pairs
> > > > > > > > >>>>>
> > > > > > > > >>>>>           if (isTransactional()) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>
> > > txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,
> > > > > > > > >>>>> this.producerIdAndEpoch);
> > > > > > > > >>>>>
> > > > > > > > >>>>>           } else {
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > >  requestEpochBumpForPartition(batch.topicPartition);
> > > > > > > > >>>>>
> > > > > > > > >>>>>           }
> > > > > > > > >>>>>
> > > > > > > > >>>>>           return true;
> > > > > > > > >>>>>
> > > > > > > > >>>>>       }
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>       if (!isTransactional()) {
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // For the idempotent producer, always retry
> > > > > > > > >>> UNKNOWN_PRODUCER_ID
> > > > > > > > >>>>> errors. If the batch has the current
> > > > > > > > >>>>>
> > > > > > > > >>>>>           // producer ID and epoch, request a bump of
> the
> > > > > epoch.
> > > > > > > > >>> Otherwise
> > > > > > > > >>>>> just retry the produce.
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > >  requestEpochBumpForPartition(batch.topicPartition);
> > > > > > > > >>>>>
> > > > > > > > >>>>>           return true;
> > > > > > > > >>>>>
> > > > > > > > >>>>>       }
> > > > > > > > >>>>>
> > > > > > > > >>>>> }
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> I was considering keeping this behavior — but am open
> to
> > > > > > > simplifying
> > > > > > > > >>> it.
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> We are leaving changes to older clients off the table
> > here
> > > > > since
> > > > > > it
> > > > > > > > >>> caused
> > > > > > > > >>>>> many issues for clients in the past. Previously this
> was
> > a
> > > > > fatal
> > > > > > > > error
> > > > > > > > >>> and
> > > > > > > > >>>>> we didn't have the mechanisms in place to detect when
> > this
> > > > was
> > > > > a
> > > > > > > > >>> legitimate
> > > > > > > > >>>>> case vs some bug or gap in the protocol. Ensuring each
> > > > > > transaction
> > > > > > > > has
> > > > > > > > >>> its
> > > > > > > > >>>>> own epoch should close this gap.
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> And to address Jeff's second point:
> > > > > > > > >>>>> *does the typical produce request path append records
> to
> > > > local
> > > > > > log
> > > > > > > > >>> along*
> > > > > > > > >>>>>
> > > > > > > > >>>>> *with the currentTxnFirstOffset information? I would
> like
> > > to
> > > > > > > > >>> understand*
> > > > > > > > >>>>>
> > > > > > > > >>>>> *when the field is written to disk.*
> > > > > > > > >>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>> Yes, the first produce request populates this field and
> > > > writes
> > > > > > the
> > > > > > > > >>> offset
> > > > > > > > >>>>> as part of the record batch and also to the producer
> > state
> > > > > > > snapshot.
> > > > > > > > >>> When
> > > > > > > > >>>>> we reload the records on restart and/or reassignment,
> we
> > > > > > repopulate
> > > > > > > > >>> this
> > > > > > > > >>>>> field with the snapshot from disk along with the rest
> of
> > > the
> > > > > > > producer
> > > > > > > > >>>>> state.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Let me know if there are further comments and/or
> > questions.
> > > > > > > > >>>>>
> > > > > > > > >>>>> Thanks,
> > > > > > > > >>>>> Justine
> > > > > > > > >>>>>
> > > > > > > > >>>>> On Tue, Nov 22, 2022 at 9:00 PM Jeff Kim
> > > > > > > > <jeff....@confluent.io.invalid
> > > > > > > > >>>>
> > > > > > > > >>>>> wrote:
> > > > > > > > >>>>>
> > > > > > > > >>>>>> Hi Justine,
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks for the KIP! I have two questions:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 1) For new clients, we can once again return an error
> > > > > > > > >>> UNKNOWN_PRODUCER_ID
> > > > > > > > >>>>>> for sequences
> > > > > > > > >>>>>> that are non-zero when there is no producer state
> > present
> > > on
> > > > > the
> > > > > > > > >>> server.
> > > > > > > > >>>>>> This will indicate we missed the 0 sequence and we
> don't
> > > yet
> > > > > > want
> > > > > > > to
> > > > > > > > >>>>> write
> > > > > > > > >>>>>> to the log.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> I would like to understand the current behavior to
> > handle
> > > > > older
> > > > > > > > >>> clients,
> > > > > > > > >>>>>> and if there are any changes we are making. Maybe I'm
> > > > missing
> > > > > > > > >>> something,
> > > > > > > > >>>>>> but we would want to identify whether we missed the 0
> > > > sequence
> > > > > > for
> > > > > > > > >>> older
> > > > > > > > >>>>>> clients, no?
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> 2) Upon returning from the transaction coordinator, we
> > can
> > > > set
> > > > > > the
> > > > > > > > >>>>>> transaction
> > > > > > > > >>>>>> as ongoing on the leader by populating
> > > currentTxnFirstOffset
> > > > > > > > >>>>>> through the typical produce request handling.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> does the typical produce request path append records
> to
> > > > local
> > > > > > log
> > > > > > > > >>> along
> > > > > > > > >>>>>> with the currentTxnFirstOffset information? I would
> like
> > > to
> > > > > > > > understand
> > > > > > > > >>>>>> when the field is written to disk.
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> Thanks,
> > > > > > > > >>>>>> Jeff
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>> On Tue, Nov 22, 2022 at 4:44 PM Artem Livshits
> > > > > > > > >>>>>> <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > >>>>>>
> > > > > > > > >>>>>>> Hi Justine,
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> Thank you for the KIP.  I have one question.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> 5) For new clients, we can once again return an error
> > > > > > > > >>>>> UNKNOWN_PRODUCER_ID
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> I believe we had problems in the past with returning
> > > > > > > > >>>>> UNKNOWN_PRODUCER_ID
> > > > > > > > >>>>>>> because it was considered fatal and required client
> > > > restart.
> > > > > > It
> > > > > > > > >>> would
> > > > > > > > >>>>> be
> > > > > > > > >>>>>>> good to spell out the new client behavior when it
> > > receives
> > > > > the
> > > > > > > > error.
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> -Artem
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>> On Tue, Nov 22, 2022 at 10:00 AM Justine Olshan
> > > > > > > > >>>>>>> <jols...@confluent.io.invalid> wrote:
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>>> Thanks for taking a look Matthias. I've tried to
> > answer
> > > > your
> > > > > > > > >>>>> questions
> > > > > > > > >>>>>>>> below:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 10)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Right — so the hanging transaction only occurs when
> we
> > > > have
> > > > > a
> > > > > > > late
> > > > > > > > >>>>>>> message
> > > > > > > > >>>>>>>> come in and the partition is never added to a
> > > transaction
> > > > > > again.
> > > > > > > > If
> > > > > > > > >>>>> we
> > > > > > > > >>>>>>>> never add the partition to a transaction, we will
> > never
> > > > > write
> > > > > > a
> > > > > > > > >>>>> marker
> > > > > > > > >>>>>>> and
> > > > > > > > >>>>>>>> never advance the LSO.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> If we do end up adding the partition to the
> > transaction
> > > (I
> > > > > > > suppose
> > > > > > > > >>>>> this
> > > > > > > > >>>>>>> can
> > > > > > > > >>>>>>>> happen before or after the late message comes in)
> then
> > > we
> > > > > will
> > > > > > > > >>>>> include
> > > > > > > > >>>>>>> the
> > > > > > > > >>>>>>>> late message in the next (incorrect) transaction.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> So perhaps it is clearer to make the distinction
> > between
> > > > > > > messages
> > > > > > > > >>>>> that
> > > > > > > > >>>>>>>> eventually get added to the transaction (but the
> wrong
> > > > one)
> > > > > or
> > > > > > > > >>>>> messages
> > > > > > > > >>>>>>>> that never get added and become hanging.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 20)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> The client side change for 2 is removing the
> > > addPartitions
> > > > > to
> > > > > > > > >>>>>> transaction
> > > > > > > > >>>>>>>> call. We don't need to make this from the producer
> to
> > > the
> > > > > txn
> > > > > > > > >>>>>>> coordinator,
> > > > > > > > >>>>>>>> only server side.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> In my opinion, the issue with the addPartitionsToTxn
> > > call
> > > > > for
> > > > > > > > older
> > > > > > > > >>>>>>> clients
> > > > > > > > >>>>>>>> is that we don't have the epoch bump, so we don't
> know
> > > if
> > > > > the
> > > > > > > > >>> message
> > > > > > > > >>>>>>>> belongs to the previous transaction or this one. We
> > need
> > > > to
> > > > > > > check
> > > > > > > > if
> > > > > > > > >>>>>> the
> > > > > > > > >>>>>>>> partition has been added to this transaction. Of
> > course,
> > > > > this
> > > > > > > > means
> > > > > > > > >>>>> we
> > > > > > > > >>>>>>>> won't completely cover the case where we have a
> really
> > > > late
> > > > > > > > message
> > > > > > > > >>>>> and
> > > > > > > > >>>>>>> we
> > > > > > > > >>>>>>>> have added the partition to the new transaction, but
> > > > that's
> > > > > > > > >>>>>> unfortunately
> > > > > > > > >>>>>>>> something we will need the new clients to cover.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 30)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> Transaction is ongoing = partition was added to
> > > > transaction
> > > > > > via
> > > > > > > > >>>>>>>> addPartitionsToTxn. We check this with the
> > > > > > DescribeTransactions
> > > > > > > > >>> call.
> > > > > > > > >>>>>> Let
> > > > > > > > >>>>>>>> me know if this wasn't sufficiently explained here:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 40)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> The idea here is that if any messages somehow come
> in
> > > > before
> > > > > > we
> > > > > > > > get
> > > > > > > > >>>>> the
> > > > > > > > >>>>>>> new
> > > > > > > > >>>>>>>> epoch to the producer, they will be fenced. However,
> > if
> > > we
> > > > > > don't
> > > > > > > > >>>>> think
> > > > > > > > >>>>>>> this
> > > > > > > > >>>>>>>> is necessary, it can be discussed
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 50)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> It should be synchronous because if we have an event
> > > (ie,
> > > > an
> > > > > > > > error)
> > > > > > > > >>>>>> that
> > > > > > > > >>>>>>>> causes us to need to abort the transaction, we need
> to
> > > > know
> > > > > > > which
> > > > > > > > >>>>>>>> partitions to send transaction markers to. We know
> the
> > > > > > > partitions
> > > > > > > > >>>>>> because
> > > > > > > > >>>>>>>> we added them to the coordinator via the
> > > > addPartitionsToTxn
> > > > > > > call.
> > > > > > > > >>>>>>>> Previously we have had asynchronous calls in the
> past
> > > (ie,
> > > > > > > writing
> > > > > > > > >>>>> the
> > > > > > > > >>>>>>>> commit markers when the transaction is completed)
> but
> > > > often
> > > > > > this
> > > > > > > > >>> just
> > > > > > > > >>>>>>>> causes confusion as we need to wait for some
> > operations
> > > to
> > > > > > > > complete.
> > > > > > > > >>>>> In
> > > > > > > > >>>>>>> the
> > > > > > > > >>>>>>>> writing commit markers case, clients often see
> > > > > > > > >>>>> CONCURRENT_TRANSACTIONs
> > > > > > > > >>>>>>>> error messages and that can be confusing. For that
> > > reason,
> > > > > it
> > > > > > > may
> > > > > > > > be
> > > > > > > > >>>>>>>> simpler to just have synchronous calls — especially
> if
> > > we
> > > > > need
> > > > > > > to
> > > > > > > > >>>>> block
> > > > > > > > >>>>>>> on
> > > > > > > > >>>>>>>> some operation's completion anyway before we can
> start
> > > the
> > > > > > next
> > > > > > > > >>>>>>>> transaction. And yes, I meant coordinator. I will
> fix
> > > > that.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> 60)
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> When we are checking if the transaction is ongoing,
> we
> > > > need
> > > > > to
> > > > > > > > make
> > > > > > > > >>> a
> > > > > > > > >>>>>>> round
> > > > > > > > >>>>>>>> trip from the leader partition to the transaction
> > > > > coordinator.
> > > > > > > In
> > > > > > > > >>> the
> > > > > > > > >>>>>>> time
> > > > > > > > >>>>>>>> we are waiting for this message to come back, in
> > theory
> > > we
> > > > > > could
> > > > > > > > >>> have
> > > > > > > > >>>>>>> sent
> > > > > > > > >>>>>>>> a commit/abort call that would make the original
> > result
> > > of
> > > > > the
> > > > > > > > check
> > > > > > > > >>>>>> out
> > > > > > > > >>>>>>> of
> > > > > > > > >>>>>>>> date. That is why we can check the leader state
> before
> > > we
> > > > > > write
> > > > > > > to
> > > > > > > > >>>>> the
> > > > > > > > >>>>>>> log.
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> I'm happy to update the KIP if some of these things
> > were
> > > > not
> > > > > > > > clear.
> > > > > > > > >>>>>>>> Thanks,
> > > > > > > > >>>>>>>> Justine
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>> On Mon, Nov 21, 2022 at 7:11 PM Matthias J. Sax <
> > > > > > > mj...@apache.org
> > > > > > > > >
> > > > > > > > >>>>>>> wrote:
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>>> Thanks for the KIP.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Couple of clarification questions (I am not a
> broker
> > > > expert
> > > > > > do
> > > > > > > > >>>>> maybe
> > > > > > > > >>>>>>>>> some question are obvious for others, but not for
> me
> > > with
> > > > > my
> > > > > > > lack
> > > > > > > > >>>>> of
> > > > > > > > >>>>>>>>> broker knowledge).
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (10)
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> The delayed message case can also violate EOS if
> the
> > > > > delayed
> > > > > > > > >>>>>> message
> > > > > > > > >>>>>>>>> comes in after the next addPartitionsToTxn request
> > > comes
> > > > > in.
> > > > > > > > >>>>>>> Effectively
> > > > > > > > >>>>>>>> we
> > > > > > > > >>>>>>>>> may see a message from a previous (aborted)
> > transaction
> > > > > > become
> > > > > > > > part
> > > > > > > > >>>>>> of
> > > > > > > > >>>>>>>> the
> > > > > > > > >>>>>>>>> next transaction.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> What happens if the message come in before the next
> > > > > > > > >>>>>> addPartitionsToTxn
> > > > > > > > >>>>>>>>> request? It seems the broker hosting the data
> > > partitions
> > > > > > won't
> > > > > > > > know
> > > > > > > > >>>>>>>>> anything about it and append it to the partition,
> > too?
> > > > What
> > > > > > is
> > > > > > > > the
> > > > > > > > >>>>>>>>> difference between both cases?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Also, it seems a TX would only hang, if there is no
> > > > > following
> > > > > > > TX
> > > > > > > > >>>>> that
> > > > > > > > >>>>>>> is
> > > > > > > > >>>>>>>>> either committer or aborted? Thus, for the case
> > above,
> > > > the
> > > > > TX
> > > > > > > > might
> > > > > > > > >>>>>>>>> actually not hang (of course, we might get an EOS
> > > > violation
> > > > > > if
> > > > > > > > the
> > > > > > > > >>>>>>> first
> > > > > > > > >>>>>>>>> TX was aborted and the second committed, or the
> other
> > > way
> > > > > > > > around).
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (20)
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> Of course, 1 and 2 require client-side changes, so
> > for
> > > > > older
> > > > > > > > >>>>>> clients,
> > > > > > > > >>>>>>>>> those approaches won’t apply.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> For (1) I understand why a client change is
> > necessary,
> > > > but
> > > > > > not
> > > > > > > > sure
> > > > > > > > >>>>>> why
> > > > > > > > >>>>>>>>> we need a client change for (2). Can you elaborate?
> > --
> > > > > Later
> > > > > > > you
> > > > > > > > >>>>>>> explain
> > > > > > > > >>>>>>>>> that we should send a DescribeTransactionRequest,
> > but I
> > > > am
> > > > > > not
> > > > > > > > sure
> > > > > > > > >>>>>>> why?
> > > > > > > > >>>>>>>>> Can't we not just do an implicit AddPartiitonToTx,
> > too?
> > > > If
> > > > > > the
> > > > > > > > old
> > > > > > > > >>>>>>>>> producer correctly registered the partition
> already,
> > > the
> > > > > > > > >>>>>> TX-coordinator
> > > > > > > > >>>>>>>>> can just ignore it as it's an idempotent operation?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (30)
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> To cover older clients, we will ensure a
> transaction
> > > is
> > > > > > > ongoing
> > > > > > > > >>>>>>> before
> > > > > > > > >>>>>>>>> we write to a transaction
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Not sure what you mean by this? Can you elaborate?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (40)
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> [the TX-coordinator] will write the prepare commit
> > > > message
> > > > > > > with
> > > > > > > > a
> > > > > > > > >>>>>>>> bumped
> > > > > > > > >>>>>>>>> epoch and send WriteTxnMarkerRequests with the
> bumped
> > > > > epoch.
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Why do we use the bumped epoch for both? It seems
> > more
> > > > > > > intuitive
> > > > > > > > to
> > > > > > > > >>>>>> use
> > > > > > > > >>>>>>>>> the current epoch, and only return the bumped epoch
> > to
> > > > the
> > > > > > > > >>>>> producer?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (50) "Implicit AddPartitionToTransaction"
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Why does the implicitly sent request need to be
> > > > > synchronous?
> > > > > > > The
> > > > > > > > >>>>> KIP
> > > > > > > > >>>>>>>>> also says
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> in case we need to abort and need to know which
> > > > partitions
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> What do you mean by this?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>> we don’t want to write to it before we store in
> the
> > > > > > > transaction
> > > > > > > > >>>>>>> manager
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> Do you mean TX-coordinator instead of "manager"?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> (60)
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> For older clients and ensuring that the TX is
> > ongoing,
> > > > you
> > > > > > > > >>>>> describe a
> > > > > > > > >>>>>>>>> race condition. I am not sure if I can follow here.
> > Can
> > > > you
> > > > > > > > >>>>>> elaborate?
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> -Matthias
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>> On 11/18/22 1:21 PM, Justine Olshan wrote:
> > > > > > > > >>>>>>>>>> Hey all!
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> I'd like to start a discussion on my proposal to
> add
> > > > some
> > > > > > > > >>>>>> server-side
> > > > > > > > >>>>>>>>>> checks on transactions to avoid hanging
> > transactions.
> > > I
> > > > > know
> > > > > > > > this
> > > > > > > > >>>>>> has
> > > > > > > > >>>>>>>>> been
> > > > > > > > >>>>>>>>>> an issue for some time, so I really hope this KIP
> > will
> > > > be
> > > > > > > > helpful
> > > > > > > > >>>>>> for
> > > > > > > > >>>>>>>>> many
> > > > > > > > >>>>>>>>>> users of EOS.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> The KIP includes changes that will be compatible
> > with
> > > > old
> > > > > > > > clients
> > > > > > > > >>>>>> and
> > > > > > > > >>>>>>>>>> changes to improve performance and correctness on
> > new
> > > > > > clients.
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Please take a look and leave any comments you may
> > > have!
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> KIP:
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
> > > > > > > > >>>>>>>>>> JIRA:
> > > https://issues.apache.org/jira/browse/KAFKA-14402
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>> Thanks!
> > > > > > > > >>>>>>>>>> Justine
> > > > > > > > >>>>>>>>>>
> > > > > > > > >>>>>>>>>
> > > > > > > > >>>>>>>>
> > > > > > > > >>>>>>>
> > > > > > > > >>>>>>
> > > > > > > > >>>>>
> > > > > > > > >>>>
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to