would it build an offset map with just the latest timestamp for a key?

Cannot remember the details without reading the KIP, but yes, something like this (I believe it actually needs to track both, offset and timestamp per key).

I wonder if ordering assumptions are baked in there, why not use offset-based 
compaction.

The use case is a compacted topic that does contain out-of-order data. If you set key k1=v1 @ 5 offset 100 and later key1 = v0 @ 3 at offset 200 we want to cleanup v0 with higher offset because it's out-of-order based on time, but keep v1 what is the actual latest version of k1.


I was also not aware of this "guarantee" with regards to broker side time.

As already said: I am not sure if it's a public contract, but based on my experience, people might reply on it as "implicit contract". -- Maybe somebody else knows if it's public or not, and if it would be ok to "break" it.

Let me know if you have any concerns here.

My understanding is: While we cannot make an offset-order guarantee for interleaved writes of different producer, if the topic is configures with "append_time", we "guarantee" (cf. my comment above") timestamp order... If that's the case, it would be an issue if we break this "guarantee".

I am not sure when the broker sets the timestamp for "append_time" config? If we do it before putting the request into purgatory, we have a problem. However, if we set the timestamp when we actually process the request and do the actual append, it seems there is no issue, as the request that was waiting in purgatory get the "newest" timestamp and thus cannot introduce out-of-order data.


-Matthias


On 1/24/23 10:44 AM, Justine Olshan wrote:
Hey Matthias,

I have actually never heard of KIP-280 so thanks for bringing it up. That
seems interesting. I wonder how it would work though -- would it build an
offset map with just the latest timestamp for a key? I wonder if ordering
assumptions are baked in there, why not use offset-based compaction.

I was also not aware of this "guarantee" with regards to broker side time.
I think that we can do in order handling for a given producer, but not
across all producers. However, we can't guarantee that anyway.

Let me know if you have any concerns here.

Thanks,
Justine

On Mon, Jan 23, 2023 at 6:33 PM Matthias J. Sax <mj...@apache.org> wrote:

Just a side note about Guozhang comments about timestamps.

If the producer sets the timestamp, putting the record into purgatory
seems not to be an issue (as already said: for this case we don't
guarantee timestamp order between writes of different producers anyway).
However, if the broker sets the timestamp, the expectation is that there
is no out-of-order data in the partition ever; if we would introduce
out-of-order data for this case (for interleaved writes of different
producers), it seems we would violate the current contract? (To be fair:
I don't know if that's an official contract, but I assume people rely on
this behavior -- and it "advertised" in many public talks...)

About compaction: there is actually KIP-280 that adds timestamp based
compaction what is a very useful feature for Kafka Streams with regard
to out-of-order data handling. So the impact if we introduce
out-of-order data could be larger scoped.


-Matthias


On 1/20/23 4:48 PM, Justine Olshan wrote:
Hey Artem,

I see there is a check for transactional producers. I'm wondering if we
don't handle the epoch overflow case. I'm also not sure it will be a huge
issue to extend to transactional producers, but maybe I'm missing
something.

As for the recovery path -- I think Guozhang's point was if we have a bad
client that repeatedly tries to produce without adding to the transaction
we would do the following:
a) if not fatal, we just fail the produce request over and over
b) if fatal, we fence the producer

Here with B, the issue with the client would be made clear more quickly.
I
suppose there are some intermediate cases where the issue only occurs
sometimes, but I wonder if we should consider how to recover with clients
who don't behave as expected anyway.

I think there is a place for the abortable error that we are adding --
just
abort and try again. But I think there are also some cases where trying
to
recover overcomplicates some logic. Especially if we are considering
older
clients -- there I'm not sure if there's a ton we can do besides fail the
batch or fence the producer. With newer clients, we can consider more
options for what can just be recovered after aborting. But epochs might
be
a hard one unless we also want to reset producer ID.

Thanks,
Justine



On Fri, Jan 20, 2023 at 3:59 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

   besides the poorly written client case

A poorly written client could create a lot of grief to people who run
Kafka
brokers :-), so when deciding to make an error fatal I would see if
there
is a reasonable recovery path rather than how often it could happen.
If we
have solid implementation of transactions (which I hope we'll do as a
result of this KIP), it would help to recover from a large class of
errors
by just aborting a transaction, even if the cause of error is a race
condition or etc.

-Artem

On Fri, Jan 20, 2023 at 3:26 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Artem --
I guess the discussion path we were going down is when we expect to see
this error. I mentioned that it was hard to come up with cases for when
the
producer would still be around to receive the error besides the poorly
written client case.
If we don't expect to have a producer to receive the response, it sort
of
makes sense for it to be fatal.

I had some discussion with Jason offline about the epoch being off
cases
and I'm not sure we could find a ton (outside of produce requests)
where
we
could/should recover. I'd be happy to hear some examples though, maybe
I'm
missing something.

Thanks,
Justine

On Fri, Jan 20, 2023 at 3:19 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

In general, I'd like to avoid fatal errors as much as possible, in
some
sense fatal errors just push out recovery logic to the application
which
either complicates the application or leads to disruption (we've seen
cases
when a transient broker error could lead to work stoppage when
applications
need to be manually restarted).  I think we should strive to define
recovery logic for most errors (and/or encapsulate it in the Kafka
client
as much as possible).

One benefit of transactions is that they simplify recovery from
errors,
pretty much any error (that's not handled transparently by retries in
Kafka
client) can be handled by the application via aborting the transaction
and
repeating the transactional logic again.  One tricky error is an error
during commit, because we don't know the outcome.  For commit errors,
the
recommendation should be to retry the commit until it returns the
specific
result (committed or aborted).

-Artem

On Fri, Jan 20, 2023 at 2:52 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

That's a fair point about other clients.

I think the abortable error case is interesting because I'm curious
how
other clients would handle this. I assume they would need to
implement
handling for the error code unless they did something like "any
unknown
error codes/any codes that aren't x,y,z are retriable." I would hope
that
unknown error codes were fatal, and if the code was implemented it
would
abort the transaction. But I will think on this too.

As for InvalidRecord -- you mentioned it was not fatal, but I'm
taking
a
look through the code. We would see this on handling the produce
response.
If I recall correctly, we check if errors are retriable. I think this
error
would not be retriable. But I guess the concern here is that it is
not
enough for just that batch to fail. I guess I hadn't considered fully
fencing the old producer but there are valid arguments here why we
would
want to.

Thanks,
Justine

On Fri, Jan 20, 2023 at 2:35 PM Guozhang Wang <
guozhang.wang...@gmail.com>
wrote:

Thanks Justine for the replies! I agree with most of your thoughts.

Just for 3/7), though I agree for our own AK producer, since we do
"nextRequest(boolean hasIncompleteBatches)", we guarantee the
end-txn
would not be sent until we've effectively flushed, but I was
referring
to any future bugs or other buggy clients that the same client may
get
into this situation, in which case we should give the client a
clear
msg that "you did something wrong, and hence now you should fatally
close yourself". What I'm concerned about is that, by seeing an
"abortable error" or in some rare cases an "invalid record", the
client could not realize "something that's really bad happened". So
it's not about adding a new error, it's mainly about those real
buggy
situations causing such "should never happen" cases, the errors
return
would not be informative enough.

Thinking in other ways, if we believe that for most cases such
error
codes would not reach the original clients since they would be
disconnected or even gone by that time, and only in some rare cases
they would still be seen by the sending clients, then why not make
them more fatal and more specific than generic.

Guozhang

On Fri, Jan 20, 2023 at 1:59 PM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Hey Guozhang. Thanks for taking a look and for the detailed
comments!
I'll
do my best to address below.

1. I see what you are saying here, but I think I need to look
through
the
sequence of events you mention. Typically we've seen this issue
in
a
few
cases.

   One is when we have a producer disconnect when trying to
produce.
Typically in these cases, we abort the transaction. We've seen
that
after
the markers are written, the disconnection can sometimes cause
the
request
to get flushed to the broker. In this case, we don't need client
handling
because the producer we are responding to is gone. We just needed
to
make
sure we didn't write to the log on the broker side. I'm trying to
think
of
a case where we do have the client to return to. I'd think the
same
client
couldn't progress to committing the transaction unless the
produce
request
returned right? Of course, there is the incorrectly written
clients
case.
I'll think on this a bit more and let you know if I come up with
another
scenario when we would return to an active client when the
transaction
is
no longer ongoing.

I was not aware that we checked the result of a send after we
commit
though. I'll need to look into that a bit more.

2. There were some questions about this in the discussion. The
plan
is
to
handle overflow with the mechanism we currently have in the
producer.
If
we
try to bump and the epoch will overflow, we actually allocate a
new
producer ID. I need to confirm the fencing logic on the last
epoch
(ie,
we
probably shouldn't allow any records to be produced with the
final
epoch
since we can never properly fence that one).

3. I can agree with you that the current error handling is
messy. I
recall
taking a look at your KIP a while back, but I think I mostly saw
the
section about how the errors were wrapped. Maybe I need to take
another
look. As for abortable error, the idea was that the handling
would
be
simple -- if this error is seen, the transaction should be
aborted
--
no
other logic about previous state or requests necessary. Is your
concern
simply about adding new errors? We were hoping to have an error
that
would
have one meaning and many of the current errors have a history of
meaning
different things on different client versions. That was the main
motivation
for adding a new error.

4. This is a good point about record timestamp reordering.
Timestamps
don't
affect compaction, but they do affect retention deletion. For
that,
kafka
considers the largest timestamp in the segment, so I think a
small
amount
of reordering (hopefully on the order of milliseconds or even
seconds)
will
be ok. We take timestamps from clients so there is already a
possibility
for some drift and non-monotonically increasing timestamps.

5. Thanks for catching. The error is there, but it's actually
that
those
fields should be 4+! Due to how the message generator works, I
actually
have to redefine those fields inside the
`"AddPartitionsToTxnTransaction`
block for it to build correctly. I'll fix it to be correct.

6. Correct -- we will only add the request to purgatory if the
cache
has
no
ongoing transaction. I can change the wording to make that
clearer
that
we
only place the request in purgatory if we need to contact the
transaction
coordinator.

7. We did take a look at some of the errors and it was hard to
come
up
with
a good one. I agree that InvalidTxnStateException is ideal except
for
the
fact that it hasn't been returned on Produce requests before. The
error
handling for clients is a bit vague (which is why I opened
KAFKA-14439
<https://issues.apache.org/jira/browse/KAFKA-14439>), but the
decision
we
made here was to only return errors that have been previously
returned
to
producers. As for not being fatal, I think part of the theory was
that
in
many cases, the producer would be disconnected. (See point 1) and
this
would just be an error to return from the server. I did plan to
think
about
other cases, so let me know if you think of any as well!

Lots to say! Let me know if you have further thoughts!
Justine

On Fri, Jan 20, 2023 at 11:21 AM Guozhang Wang <
guozhang.wang...@gmail.com>
wrote:

Hello Justine,

Thanks for the great write-up! I made a quick pass through it
and
here
are some thoughts (I have not been able to read through this
thread
so
pardon me if they have overlapped or subsumed by previous
comments):

First are some meta ones:

1. I think we need to also improve the client's experience once
we
have this defence in place. More concretely, say a user's
producer
code is like following:

future = producer.send();
// producer.flush();
producer.commitTransaction();
future.get();

Which resulted in the order of a) produce-request sent by
producer,
b)
end-txn-request sent by producer, c) end-txn-response sent
back,
d)
txn-marker-request sent from coordinator to partition leader,
e)
produce-request finally received by the partition leader,
before
this
KIP e) step would be accepted causing a dangling txn; now it
would
be
rejected in step e) which is good. But from the client's point
of
view
now it becomes confusing since the `commitTransaction()`
returns
successfully, but the "future" throws an invalid-epoch error,
and
they
are not sure if the transaction did succeed or not. In fact, it
"partially succeeded" with some msgs being rejected but others
committed successfully.

Of course the easy way to avoid this is, always call
"producer.flush()" before commitTxn and that's what we do
ourselves,
and what we recommend users do. But I suspect not everyone does
it.
In
fact I just checked the javadoc in KafkaProducer and our code
snippet
does not include a `flush()` call. So I'm thinking maybe we can
in
side the `commitTxn` code to enforce flushing before sending
the
end-txn request.

2. I'd like to clarify a bit details on "just add partitions to
the
transaction on the first produce request during a transaction".
My
understanding is that the partition leader's cache has the
producer
id
/ sequence / epoch for the latest txn, either on-going or is
completed
(upon receiving the marker request from coordinator). When a
produce
request is received, if

* producer's epoch < cached epoch, or producer's epoch ==
cached
epoch
but the latest txn is completed, leader directly reject with
invalid-epoch.
* producer's epoch > cached epoch, park the the request and
send
add-partitions request to coordinator.

In order to do it, does the coordinator need to bump the
sequence
and
reset epoch to 0 when the next epoch is going to overflow? If
no
need
to do so, then how we handle the (admittedly rare, but still
may
happen) epoch overflow situation?

3. I'm a bit concerned about adding a generic "ABORTABLE_ERROR"
given
we already have a pretty messy error classification and error
handling
on the producer clients side --- I have a summary about the
issues
and
a proposal to address this in







https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
-- I understand we do not want to use "UNKNOWN_PRODUCER_ID"
anymore
and in fact we intend to deprecate it in KIP-360 and eventually
remove
it; but I'm wondering can we still use specific error codes.
E.g.
what
about "InvalidProducerEpochException" since for new clients,
the
actual reason this would actually be rejected is indeed because
the
epoch on the coordinator caused the add-partitions-request from
the
brokers to be rejected anyways?

4. It seems we put the producer request into purgatory before
we
ever
append the records, while other producer's records may still be
appended during the time; and that potentially may result in
some
re-ordering compared with reception order. I'm not super
concerned
about it since Kafka does not guarantee reception ordering
across
producers anyways, but it may make the timestamps of records
inside a
partition to be more out-of-ordered. Are we aware of any
scenarios
such as future enhancements on log compactions that may be
affected
by
this effect?

Below are just minor comments:

5. In "AddPartitionsToTxnTransaction" field of
"AddPartitionsToTxnRequest" RPC, the versions of those inner
fields
are "0-3" while I thought they should be "0+" still?

6. Regarding "we can place the request in a purgatory of sorts
and
check if there is any state for the transaction on the
broker": i
think at this time when we just do the checks against the
cached
state, we do not need to put the request to purgatory yet?

7. This is related to 3) above. I feel using
"InvalidRecordException"
for older clients may also be a bit confusing, and also it is
not
fatal -- for old clients, it better to be fatal since this
indicates
the clients is doing something wrong and hence it should be
closed.
And in general I'd prefer to use slightly more specific meaning
error
codes for clients. That being said, I also feel
"InvalidProducerEpochException" is not suitable for old
versioned
clients, and we'd have to pick one that old clients recognize.
I'd
prefer "InvalidTxnStateException" but that one is supposed to
be
returned from txn coordinators only today. I'd suggest we do a
quick
check in the current client's code path and see if that one
would
be
handled if it's from a produce-response, and if yes, use this
one;
otherwise, use "ProducerFencedException" which is much less
meaningful
but it's still a fatal error.


Thanks,
Guozhang



On Wed, Jan 18, 2023 at 3:01 PM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Yeah -- looks like we already have code to handle bumping the
epoch
and
when the epoch is Short.MAX_VALUE, we get a new producer ID.
Since
this
is
already the behavior, do we want to change it further?

Justine

On Wed, Jan 18, 2023 at 1:12 PM Justine Olshan <
jols...@confluent.io

wrote:

Hey all, just wanted to quickly update and say I've
modified
the
KIP to
explicitly mention that AddOffsetCommitsToTxnRequest will
be
replaced
by
a coordinator-side (inter-broker) AddPartitionsToTxn
implicit
request.
This
mirrors the user partitions and will implicitly add offset
partitions
to
transactions when we commit offsets on them. We will
deprecate
AddOffsetCommitsToTxnRequest
for new clients.

Also to address Artem's comments --
I'm a bit unsure if the changes here will change the
previous
behavior
for
fencing producers. In the case you mention in the first
paragraph,
are
you
saying we bump the epoch before we try to abort the
transaction?
I
think I
need to understand the scenarios you mention a bit better.

As for the second part -- I think it makes sense to have
some
sort
of
"sentinel" epoch to signal epoch is about to overflow (I
think
we
sort
of
have this value in place in some ways) so we can codify it
in
the
KIP.
I'll
look into that and try to update soon.

Thanks,
Justine.

On Fri, Jan 13, 2023 at 5:01 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

It's good to know that KIP-588 addressed some of the
issues.
Looking
at
the code, it still looks like there are some cases that
would
result
in
fatal error, e.g. PRODUCER_FENCED is issued by the
transaction
coordinator
if epoch doesn't match, and the client treats it as a
fatal
error
(code in
TransactionManager request handling).  If we consider, for
example,
committing a transaction that returns a timeout, but
actually
succeeds,
trying to abort it or re-commit may result in
PRODUCER_FENCED
error
(because of epoch bump).

For failed commits, specifically, we need to know the
actual
outcome,
because if we return an error the application may think
that
the
transaction is aborted and redo the work, leading to
duplicates.

Re: overflowing epoch.  We could either do it on the TC
and
return
both
producer id and epoch (e.g. change the protocol), or
signal
the
client
that
it needs to get a new producer id.  Checking for max epoch
could
be a
reasonable signal, the value to check should probably be
present
in
the
KIP
as this is effectively a part of the contract.  Also, the
TC
should
probably return an error if the client didn't change
producer
id
after
hitting max epoch.

-Artem


On Thu, Jan 12, 2023 at 10:31 AM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Thanks for the discussion Artem.

With respect to the handling of fenced producers, we
have
some
behavior
already in place. As of KIP-588:









https://cwiki.apache.org/confluence/display/KAFKA/KIP-588%3A+Allow+producers+to+recover+gracefully+from+transaction+timeouts
,
we handle timeouts more gracefully. The producer can
recover.

Produce requests can also recover from epoch fencing by
aborting the
transaction and starting over.

What other cases were you considering that would cause
us
to
have a
fenced
epoch but we'd want to recover?

The first point about handling epoch overflows is fair.
I
think
there is
some logic we'd need to consider. (ie, if we are one
away
from
the
max
epoch, we need to reset the producer ID.) I'm still
wondering
if
there
is a
way to direct this from the response, or if everything
should
be
done on
the client side. Let me know if you have any thoughts
here.

Thanks,
Justine

On Tue, Jan 10, 2023 at 4:06 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

There are some workflows in the client that are
implied
by
protocol
changes, e.g.:

- for new clients, epoch changes with every
transaction
and
can
overflow,
in old clients this condition was handled
transparently,
because
epoch
was
bumped in InitProducerId and it would return a new
producer
id if
epoch
overflows, the new clients would need to implement
some
workflow
to
refresh
producer id
- how to handle fenced producers, for new clients
epoch
changes
with
every
transaction, so in presence of failures during
commits /
aborts,
the
producer could get easily fenced, old clients would
pretty
much
would
get
fenced when a new incarnation of the producer was
initialized
with
InitProducerId so it's ok to treat as a fatal error,
the
new
clients
would
need to implement some workflow to handle that error,
otherwise
they
could
get fenced by themselves
- in particular (as a subset of the previous issue),
what
would
the
client
do if it got a timeout during commit?  commit could've
succeeded
or
failed

Not sure if this has to be defined in the KIP as
implementing
those
probably wouldn't require protocol changes, but we
have
multiple
implementations of Kafka clients, so probably would be
good
to
have
some
client implementation guidance.  Could also be done
as a
separate
doc.

-Artem

On Mon, Jan 9, 2023 at 3:38 PM Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hey all, I've updated the KIP to incorporate Jason's
suggestions.












https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense


1. Use AddPartitionsToTxn + verify flag to check on
old
clients
2. Updated AddPartitionsToTxn API to support
transaction
batching
3. Mention IBP bump
4. Mention auth change on new AddPartitionsToTxn
version.

I'm planning on opening a vote soon.
Thanks,
Justine

On Fri, Jan 6, 2023 at 3:32 PM Justine Olshan <
jols...@confluent.io

wrote:

Thanks Jason. Those changes make sense to me. I
will
update
the
KIP.



On Fri, Jan 6, 2023 at 3:31 PM Jason Gustafson
<ja...@confluent.io.invalid>
wrote:

Hey Justine,

I was wondering about compatibility here. When
we
send
requests
between brokers, we want to ensure that the
receiving
broker
understands
the request (specifically the new fields).
Typically
this is
done
via
IBP/metadata version.
I'm trying to think if there is a way around it
but
I'm
not
sure
there
is.

Yes. I think we would gate usage of this behind
an
IBP
bump.
Does
that
seem
reasonable?

As for the improvements -- can you clarify how
the
multiple
transactional
IDs would help here? Were you thinking of a case
where
we
wait/batch
multiple produce requests together? My
understanding
for
now
was
1
transactional ID and one validation per 1 produce
request.

Each call to `AddPartitionsToTxn` is essentially
a
write
to
the
transaction
log and must block on replication. The more we
can
fit
into a
single
request, the more writes we can do in parallel.
The
alternative
is
to
make
use of more connections, but usually we prefer
batching
since the
network
stack is not really optimized for high
connection/request
loads.

Finally with respect to the authorizations, I
think
it
makes
sense
to
skip
topic authorizations, but I'm a bit confused by
the
"leader
ID"
field.
Wouldn't we just want to flag the request as
from a
broker
(does
it
matter
which one?).

We could also make it version-based. For the next
version, we
could
require
CLUSTER auth. So clients would not be able to use
the
API
anymore,
which
is
probably what we want.

-Jason

On Fri, Jan 6, 2023 at 10:43 AM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

As a follow up, I was just thinking about the
batching
a
bit
more.
I suppose if we have one request in flight and
we
queue up
the
other
produce requests in some sort of purgatory, we
could
send
information
out
for all of them rather than one by one. So that
would
be a
benefit
of
batching partitions to add per transaction.

I'll need to think a bit more on the design of
this
part
of the
KIP,
and
will update the KIP in the next few days.

Thanks,
Justine

On Fri, Jan 6, 2023 at 10:22 AM Justine Olshan
<
jols...@confluent.io>
wrote:

Hey Jason -- thanks for the input -- I was
just
digging
a bit
deeper
into
the design + implementation of the validation
calls
here
and
what
you
say
makes sense.

I was wondering about compatibility here.
When
we
send
requests
between brokers, we want to ensure that the
receiving
broker
understands
the request (specifically the new fields).
Typically
this is
done
via
IBP/metadata version.
I'm trying to think if there is a way around
it
but
I'm
not
sure
there
is.

As for the improvements -- can you clarify
how
the
multiple
transactional
IDs would help here? Were you thinking of a
case
where we
wait/batch
multiple produce requests together? My
understanding
for
now
was 1
transactional ID and one validation per 1
produce
request.

Finally with respect to the authorizations, I
think
it
makes
sense
to
skip
topic authorizations, but I'm a bit confused
by
the
"leader
ID"
field.
Wouldn't we just want to flag the request as
from a
broker
(does
it
matter
which one?).

I think I want to adopt these suggestions,
just
had
a few
questions
on
the
details.

Thanks,
Justine

On Thu, Jan 5, 2023 at 5:05 PM Jason
Gustafson
<ja...@confluent.io.invalid>
wrote:

Hi Justine,

Thanks for the proposal.

I was thinking about the implementation a
little
bit.
In the
current
proposal, the behavior depends on whether we
have
an
old or
new
client.
For
old clients, we send `DescribeTransactions`
and
verify
the
result
and
for
new clients, we send `AddPartitionsToTxn`.
We
might
be
able
to
simplify
the
implementation if we can use the same
request
type.
For
example,
what if
we
bump the protocol version for
`AddPartitionsToTxn`
and
add a
`validateOnly`
flag? For older versions, we can set
`validateOnly=true` so
that
the
request only returns successfully if the
partition
had
already
been
added.
For new versions, we can set
`validateOnly=false`
and
the
partition
will
be
added to the transaction. The other slightly
annoying
thing
that
this
would
get around is the need to collect the
transaction
state
for
all
partitions
even when we only care about a subset.

Some additional improvements to consider:

- We can give `AddPartitionsToTxn` better
batch
support
for
inter-broker
usage. Currently we only allow one
`TransactionalId` to
be
specified,
but
the broker may get some benefit being able
to
batch
across
multiple
transactions.
- Another small improvement is skipping
topic
authorization
checks
for
`AddPartitionsToTxn` when the request is
from
a
broker.
Perhaps
we
can
add
a field for the `LeaderId` or something like
that
and
require
CLUSTER
permission when set.

Best,
Jason



On Mon, Dec 19, 2022 at 3:56 PM Jun Rao
<j...@confluent.io.invalid

wrote:

Hi, Justine,

Thanks for the explanation. It makes sense
to
me
now.

Jun

On Mon, Dec 19, 2022 at 1:42 PM Justine
Olshan
<jols...@confluent.io.invalid>
wrote:

Hi Jun,

My understanding of the mechanism is
that
when
we
get to
the
last
epoch,
we
increment to the fencing/last epoch and
if
any
further
requests
come
in
for
this producer ID they are fenced. Then
the
producer
gets a
new
ID
and
restarts with epoch/sequence 0. The
fenced
epoch
sticks
around
for
the
duration of producer.id.expiration.ms
and
blocks
any
late
messages
there.
The new ID will get to take advantage of
the
improved
semantics
around
non-zero start sequences. So I think we
are
covered.

The only potential issue is overloading
the
cache,
but
hopefully
the
improvements (lowered
producer.id.expiration.ms
)
will
help
with
that.
Let
me know if you still have concerns.

Thanks,
Justine

On Mon, Dec 19, 2022 at 10:24 AM Jun Rao
<j...@confluent.io.invalid>
wrote:

Hi, Justine,

Thanks for the explanation.

70. The proposed fencing logic doesn't
apply
when
pid
changes,
is
that
right? If so, I am not sure how
complete
we
are
addressing
this
issue
if
the pid changes more frequently.

Thanks,

Jun



On Fri, Dec 16, 2022 at 9:16 AM
Justine
Olshan
<jols...@confluent.io.invalid>
wrote:

Hi Jun,

Thanks for replying!

70.We already do the overflow
mechanism,
so
my
change
would
just
make
it
happen more often.
I was also not suggesting a new
field
in
the
log,
but
in
the
response,
which would be gated by the client
version.
Sorry if
something
there
is
unclear. I think we are starting to
diverge.
The goal of this KIP is to not
change
to
the
marker
format
at
all.

71. Yes, I guess I was going under
the
assumption
that
the
log
would
just
look at its last epoch and treat it
as
the
current
epoch. I
suppose
we
can
have some special logic that if the
last
epoch
was
on a
marker
we
actually
expect the next epoch or something
like
that. We
just
need
to
distinguish
based on whether we had a
commit/abort
marker.

72.
if the producer epoch hasn't been
bumped
on
the
broker, it seems that the stucked
message
will
fail
the
sequence
validation
and will be ignored. If the producer
epoch
has
been
bumped,
we
ignore
the
sequence check and the stuck message
could
be
appended
to
the
log.
So,
is
the latter case that we want to
guard?

I'm not sure I follow that "the
message
will
fail
the
sequence
validation".
In some of these cases, we had an
abort
marker
(due
to
an
error)
and
then
the late message comes in with the
correct
sequence
number.
This
is a
case
covered by the KIP.
The latter case is actually not
something
we've
considered
here. I
think
generally when we bump the epoch, we
are
accepting
that
the
sequence
does
not need to be checked anymore. My
understanding is
also
that we
don't
typically bump epoch mid transaction
(based
on a
quick
look
at
the
code)
but let me know if that is the case.

Thanks,
Justine

On Thu, Dec 15, 2022 at 12:23 PM Jun
Rao
<j...@confluent.io.invalid

wrote:

Hi, Justine,

Thanks for the reply.

70. Assigning a new pid on int
overflow
seems
a
bit
hacky.
If
we
need a
txn
level id, it will be better to
model
this
explicitly.
Adding a
new
field
would require a bit more work
since
it
requires a
new
txn
marker
format
in
the log. So, we probably need to
guard
it
with an
IBP
or
metadata
version
and document the impact on
downgrade
once
the
new
format
is
written
to
the
log.

71. Hmm, once the marker is
written,
the
partition
will
expect
the
next
append to be on the next epoch.
Does
that
cover
the
case
you
mentioned?

72. Also, just to be clear on the
stucked
message
issue
described
in
the
motivation. With EoS, we also
validate
the
sequence
id
for
idempotency.
So,
with the current logic, if the
producer
epoch
hasn't
been
bumped on
the
broker, it seems that the stucked
message
will
fail
the
sequence
validation
and will be ignored. If the
producer
epoch has
been
bumped, we
ignore
the
sequence check and the stuck
message
could be
appended
to
the
log.
So,
is
the latter case that we want to
guard?

Thanks,

Jun

On Wed, Dec 14, 2022 at 10:44 AM
Justine
Olshan
<jols...@confluent.io.invalid>
wrote:

Matthias — thanks again for
taking
time
to
look
a
this.
You
said:

My proposal was only focusing
to
avoid
dangling

transactions if records are
added
without
registered
partition.
--
Maybe

you can add a few more details
to
the
KIP
about
this
scenario
for
better

documentation purpose?


I'm not sure I understand what
you
mean
here.
The
motivation
section
describes two scenarios about
how
the
record
can be
added
without a
registered partition:


This can happen when a message
gets
stuck
or
delayed
due
to
networking
issues or a network partition,
the
transaction
aborts,
and
then
the
delayed
message finally comes in.


Another way hanging
transactions
can
occur is
that
a
client
is
buggy
and
may somehow try to write to a
partition
before
it
adds
the
partition
to
the
transaction.



For the first example of this
would
it
be
helpful
to
say
that
this
message
comes in after the abort, but
before
the
partition
is
added
to
the
next
transaction so it becomes
"hanging."
Perhaps the
next
sentence
describing
the message becoming part of the
next
transaction
(a
different
case)
was
not properly differentiated.



Jun — thanks for reading the
KIP.

70. The int typing was a
concern.
Currently
we
have a
mechanism
in
place
to
fence the final epoch when the
epoch
is
about to
overflow
and
assign
a
new
producer ID with epoch 0. Of
course,
this
is a
bit
tricky
when it
comes
to
the response back to the client.
Making this a long could be
another
option,
but
I
wonder
are
there
any
implications on changing this
field
if
the
epoch is
persisted
to
disk?
I'd
need to check the usages.

71.This was something Matthias
asked
about
as
well. I
was
considering a
possible edge case where a
produce
request
from
a
new
transaction
somehow
gets sent right after the marker
is
written, but
before
the
producer
is
alerted of the newly bumped
epoch.
In
this
case, we
may
include
this
record
when we don't want to. I suppose
we
could
try
to do
something
client
side
to bump the epoch after sending
an
endTxn as
well
in
this
scenario
—
but
I
wonder how it would work when
the
server is
aborting
based
on
a
server-side
error. I could also be missing
something and
this
scenario
is
actually
not
possible.

Thanks again to everyone reading
and
commenting.
Let
me
know
about
any
further questions or comments.

Justine

On Wed, Dec 14, 2022 at 9:41 AM
Jun
Rao
<j...@confluent.io.invalid

wrote:

Hi, Justine,

Thanks for the KIP. A couple
of
comments.

70. Currently, the producer
epoch
is
an
int.
I am
not
sure
if
it's
enough
to accommodate all
transactions
in
the
lifetime
of
a
producer.
Should
we
change that to a long or add a
new
long
field
like
txnId?

71. "it will write the prepare
commit
message
with
a
bumped
epoch
and
send
WriteTxnMarkerRequests with
the
bumped
epoch."
Hmm,
the
epoch
is
associated
with the current txn right?
So,
it
seems
weird to
write a
commit
message
with a bumped epoch. Should we
only
bump
up
the
epoch
in
EndTxnResponse
and
rename the field to sth like
nextProducerEpoch?

Thanks,

Jun



On Mon, Dec 12, 2022 at 8:54
PM
Matthias
J.
Sax <
mj...@apache.org>
wrote:

Thanks for the background.

20/30: SGTM. My proposal was
only
focusing
to
avoid
dangling
transactions if records are
added
without
registered
partition.
--
Maybe
you can add a few more
details
to
the
KIP
about
this
scenario
for
better
documentation purpose?

40: I think you hit a fair
point
about
race
conditions
or
client
bugs
(incorrectly not bumping the
epoch). The
complexity/confusion
for
using
the bumped epoch I see, is
mainly
for
internal
debugging,
ie,
inspecting
log segment dumps -- it
seems
harder to
reason
about
the
system
for
us
humans. But if we get better
guarantees, it
would
be
worth to
use
the
bumped epoch.

60: as I mentioned already,
I
don't
know the
broker
internals
to
provide
more input. So if nobody
else
chimes
in, we
should
just
move
forward
with your proposal.


-Matthias


On 12/6/22 4:22 PM, Justine
Olshan
wrote:
Hi all,
After Artem's questions
about
error
behavior,
I've
re-evaluated
the
unknown producer ID
exception
and
had
some
discussions
offline.

I think generally it makes
sense
to
simplify
error
handling
in
cases
like
this and the
UNKNOWN_PRODUCER_ID
error
has a
pretty
long
and
complicated
history. Because of this,
I
propose
adding a
new
error
code
ABORTABLE_ERROR
that when encountered by
new
clients
(gated
by
the
produce
request
version)
will simply abort the
transaction.
This
allows
the
server
to
have
some
say
in whether the client
aborts
and
makes
handling
much
simpler.
In
the
future, we can also use
this
error in
other
situations
where
we
want
to
abort the transactions. We
can
even
use on
other
apis.

I've added this to the
KIP.
Let
me
know if
there
are
any
questions
or
issues.

Justine

On Fri, Dec 2, 2022 at
10:22
AM
Justine
Olshan
<
jols...@confluent.io

wrote:

Hey Matthias,


20/30 — Maybe I also
didn't
express
myself
clearly.
For
older
clients
we
don't have a way to
distinguish
between a
previous
and
the
current
transaction since we
don't
have
the
epoch
bump.
This
means
that
a
late
message from the previous
transaction
may be
added to
the
new
one.
With
older clients — we can't
guarantee
this
won't
happen
if we
already
sent
the
addPartitionsToTxn call
(why
we
make
changes
for
the
newer
client)
but
we
can at least gate some by
ensuring
that
the
partition
has
been
added
to
the
transaction. The
rationale
here
is
that
there
are
likely
LESS
late
arrivals
as time goes on, so
hopefully
most
late
arrivals
will
come
in
BEFORE
the
addPartitionsToTxn call.
Those
that
arrive
before
will
be
properly
gated
with the
describeTransactions
approach.

If we take the approach
you
suggested,
ANY
late
arrival
from a
previous
transaction will be
added.
And
we
don't
want
that. I
also
don't
see
any
benefit in sending
addPartitionsToTxn
over
the
describeTxns
call.
They
will
both be one extra RPC to
the
Txn
coordinator.


To be clear — newer
clients
will
use
addPartitionsToTxn
instead
of
the
DescribeTxns.


40)
My concern is that if we
have
some
delay
in
the
client
to
bump
the
epoch,
it could continue to send
epoch
73
and
those
records
would
not
be
fenced.
Perhaps this is not an
issue
if
we
don't
allow
the
next
produce
to
go
through before the EndTxn
request
returns.
I'm
also
thinking
about
cases of
failure. I will need to
think
on
this a
bit.

I wasn't sure if it was
that
confusing.
But
if
we
think it
is,
we
can
investigate other ways.


60)

I'm not sure these are
the
same
purgatories
since
one
is a
produce
purgatory (I was planning
on
using a
callback
rather
than
purgatory)
and
the other is simply a
request
to
append
to
the
log.
Not
sure
we
have
any
structure here for
ordering,
but
my
understanding
is
that
the
broker
could
handle the write request
before
it
hears
back
from
the
Txn
Coordinator.

Let me know if I
misunderstood
something
or
something
was
unclear.

Justine

On Thu, Dec 1, 2022 at
12:15
PM
Matthias
J.
Sax
<
mj...@apache.org

wrote:

Thanks for the details
Justine!

20)

The client side change
for
2
is
removing
the
addPartitions
to
transaction
call. We don't need to
make
this
from
the
producer
to
the
txn
coordinator,
only server side.

I think I did not
express
myself
clearly. I
understand
that
we
can
(and
should) change the
producer
to
not
send
the
`addPartitions`
request
any
longer. But I don't
thinks
it's
requirement
to
change
the
broker?

What I am trying to say
is:
as a
safe-guard
and
improvement
for
older
producers, the partition
leader
can
just
send
the
`addPartitions`
request to the
TX-coordinator
in any
case
--
if
the
old
producer
correctly did send the
`addPartition`
request
to
the
TX-coordinator
already, the
TX-coordinator
can
just
"ignore"
is
as
idempotent.
However,
if the old producer has
a
bug
and
did
forget
to
sent
the
`addPartition`
request, we would now
ensure
that
the
partition
is
indeed
added
to
the
TX and thus fix a
potential
producer bug
(even
if we
don't
get
the
fencing via the bump
epoch).
--
It
seems to
be
a
good
improvement?
Or
is
there a reason to not do
this?



30)

Transaction is ongoing
=
partition
was
added
to
transaction
via
addPartitionsToTxn. We
check
this
with
the
DescribeTransactions
call.
Let
me know if this wasn't
sufficiently
explained
here:

If we do what I propose
in
(20), we
don't
really
need
to
make
this
`DescribeTransaction`
call,
as
the
partition
leader
adds
the
partition
for older clients and we
get
this
check
for
free.


40)

The idea here is that
if
any
messages
somehow
come
in
before
we
get
the
new
epoch to the producer,
they
will be
fenced.
However,
if
we
don't
think
this
is necessary, it can be
discussed

I agree that we should
have
epoch
fencing.
My
question is
different:
Assume we are at epoch
73,
and
we
have
an
ongoing
transaction,
that
is
committed. It seems
natural
to
write the
"prepare
commit"
marker
and
the
`WriteTxMarkerRequest`
both
with
epoch
73,
too,
as
it
belongs
to
the
current transaction. Of
course,
we
now
also
bump
the
epoch
and
expect
the next requests to
have
epoch
74,
and
would
reject
an
request
with
epoch 73, as the
corresponding
TX
for
epoch
73
was
already
committed.

It seems you propose to
write
the
"prepare
commit
marker"
and
`WriteTxMarkerRequest`
with
epoch 74
though,
what
would
work,
but
it
seems confusing. Is
there
a
reason
why
we
would
use
the
bumped
epoch
74
instead of the current
epoch
73?


60)

When we are checking if
the
transaction is
ongoing,
we
need
to
make
a
round
trip from the leader
partition
to
the
transaction
coordinator.
In
the
time
we are waiting for this
message to
come
back,
in
theory
we
could
have
sent
a commit/abort call
that
would
make the
original
result
of
the
check
out of
date. That is why we
can
check
the
leader
state
before
we
write
to
the
log.

Thanks. Got it.

However, is this really
an
issue?
We put
the
produce
request
in
purgatory, so how could
we
process
the
`WriteTxnMarkerRequest`
first?
Don't we need to put the
`WriteTxnMarkerRequest`
into
purgatory,
too,
for this case, and
process
both
request
in-order?
(Again,
my
broker
knowledge is limited and
maybe
we
don't
maintain
request
order
for
this
case, what seems to be
an
issue
IMHO,
and I
am
wondering
if
changing
request handling to
preserve
order
for
this
case
might be
the
cleaner
solution?)



-Matthias




On 11/30/22 3:28 PM,
Artem
Livshits
wrote:
Hi Justine,

I think the interesting
part
is
not in
this
logic
(because
it
tries
to
figure out when
UNKNOWN_PRODUCER_ID is
retriable
and
if
it's
retryable,
it's definitely not
fatal),
but
what
happens
when
this
logic
doesn't
return
'true' and falls
through.
In
the
old
clients
it
seems
to
be
fatal,
if
we
keep the behavior in
the
new
clients,
I'd
expect it
would
be
fatal
as
well.

-Artem

On Tue, Nov 29, 2022 at
11:57
AM
Justine
Olshan

<jols...@confluent.io.invalid

wrote:

Hi Artem and Jeff,


Thanks for taking a
look
and
sorry for
the
slow
response.

You both mentioned the
change
to
handle
UNKNOWN_PRODUCER_ID
errors.
To
be
clear — this error
code
will
only
be
sent
again
when
the
client's
request
version is high enough
to
ensure
we
handle
it
correctly.
The current (Java)
client
handles
this by
the
following
(somewhat
long)
code snippet:

// An
UNKNOWN_PRODUCER_ID
means
that
we
have
lost
the
producer
state
on the
broker. Depending on
the
log
start

// offset, we may want
to
retry
these, as
described
for
each
case
below. If
none of those apply,
then
for
the

// idempotent
producer,
we
will
locally
bump
the
epoch
and
reset
the
sequence numbers of
in-flight
batches
from

// sequence 0, then
retry
the
failed
batch,
which
should
now
succeed.
For
the transactional
producer,
allow
the

// batch to fail. When
processing
the
failed
batch,
we
will
transition
to
an abortable error and
set a
flag

// indicating that we
need
to
bump the
epoch
(if
supported
by
the
broker).

if (error ==
Errors.*UNKNOWN_PRODUCER_ID*)
{

        if
(response.logStartOffset
==
-1)
{

            // We don't
know
the log
start
offset
with
this
response.
We
should
just retry the request
until
we
get
it.

            // The
UNKNOWN_PRODUCER_ID
error
code
was
added
along
with
the new
ProduceResponse which
includes the

            //
logStartOffset.
So
the
'-1'
sentinel
is
not
for
backward
compatibility.
Instead,
it
is
possible
for

            // a broker
to
not
know
the
logStartOffset at
when
it
is
returning
the response because
the
partition

            // may have
moved
away
from
the
broker
from
the
time
the
error was
initially raised to
the
time
the

            // response
was
being
constructed.
In
these
cases,
we
should
just
retry the request: we
are
guaranteed

            // to
eventually
get a
logStartOffset
once
things
settle
down.

            return true;

        }


        if
(batch.sequenceHasBeenReset()) {

            // When the
first
inflight
batch
fails
due to
the
truncation
case,
then the sequences of
all
the
other

            // in flight
batches
would
have
been
restarted
from
the
beginning.
However, when those
responses

            // come back
from
the
broker,
they
would
also
come
with
an
UNKNOWN_PRODUCER_ID
error.
In
this
case,
we
should
not

            // reset the
sequence
numbers
to
the
beginning.

            return true;

        } else if

(lastAckedOffset(batch.topicPartition).orElse(

*NO_LAST_ACKED_SEQUENCE_NUMBER*) <
response.logStartOffset) {

            // The head
of
the
log
has
been
removed,
probably
due
to
the
retention time
elapsing.
In
this
case,

            // we expect
to
lose the
producer
state.
For
the
transactional
producer, reset the
sequences
of
all

            // inflight
batches
to
be
from
the
beginning
and
retry
them,
so
that the transaction
does
not
need to

            // be
aborted.
For
the
idempotent
producer,
bump
the
epoch
to
avoid
reusing (sequence,
epoch)
pairs

            if
(isTransactional()) {





txnPartitionMap.startSequencesAtBeginning(batch.topicPartition,

this.producerIdAndEpoch);

            } else {



requestEpochBumpForPartition(batch.topicPartition);

            }

            return true;

        }


        if
(!isTransactional())
{

            // For the
idempotent
producer,
always
retry
UNKNOWN_PRODUCER_ID
errors. If the batch
has
the
current

            // producer
ID
and
epoch,
request a
bump
of
the
epoch.
Otherwise
just retry the
produce.



requestEpochBumpForPartition(batch.topicPartition);

            return true;

        }

}


I was considering
keeping
this
behavior —
but
am
open
to
simplifying
it.



We are leaving changes
to
older
clients
off
the
table
here
since
it
caused
many issues for
clients
in
the
past.
Previously
this
was
a
fatal
error
and
we didn't have the
mechanisms
in
place to
detect
when
this
was
a
legitimate
case vs some bug or
gap
in
the
protocol.
Ensuring
each
transaction
has
its
own epoch should close
this
gap.




And to address Jeff's
second
point:
*does the typical
produce
request
path
append
records
to
local
log
along*

*with the
currentTxnFirstOffset
information?
I
would
like
to
understand*

*when the field is
written
to
disk.*


Yes, the first produce
request
populates
this
field
and
writes
the
offset
as part of the record
batch
and
also
to
the
producer
state
snapshot.
When
we reload the records
on
restart
and/or
reassignment,
we
repopulate
this
field with the
snapshot
from
disk
along
with
the
rest
of
the
producer
state.

Let me know if there
are
further
comments
and/or
questions.

Thanks,
Justine

On Tue, Nov 22, 2022
at
9:00
PM
Jeff
Kim

<jeff....@confluent.io.invalid

wrote:

Hi Justine,

Thanks for the KIP! I
have
two
questions:

1) For new clients,
we
can
once
again
return
an
error
UNKNOWN_PRODUCER_ID
for sequences
that are non-zero
when
there
is
no
producer
state
present
on
the
server.
This will indicate we
missed
the
0
sequence
and
we
don't
yet
want
to
write
to the log.

I would like to
understand
the
current
behavior
to
handle
older
clients,
and if there are any
changes
we
are
making.
Maybe
I'm
missing
something,
but we would want to
identify
whether we
missed
the 0
sequence
for
older
clients, no?

2) Upon returning
from
the
transaction
coordinator, we
can
set
the
transaction
as ongoing on the
leader
by
populating
currentTxnFirstOffset
through the typical
produce
request
handling.

does the typical
produce
request
path
append
records
to
local
log
along
with the
currentTxnFirstOffset
information?
I
would
like
to
understand
when the field is
written
to
disk.

Thanks,
Jeff


On Tue, Nov 22, 2022
at
4:44
PM
Artem
Livshits
<
alivsh...@confluent.io
.invalid>
wrote:

Hi Justine,

Thank you for the
KIP.
I
have
one
question.

5) For new clients,
we
can
once
again
return
an
error
UNKNOWN_PRODUCER_ID

I believe we had
problems
in the
past
with
returning
UNKNOWN_PRODUCER_ID
because it was
considered
fatal
and
required
client
restart.
It
would
be
good to spell out
the
new
client
behavior
when
it
receives
the
error.

-Artem

On Tue, Nov 22, 2022
at
10:00 AM
Justine
Olshan

<jols...@confluent.io.invalid>
wrote:

Thanks for taking a
look
Matthias.
I've
tried
to
answer
your
questions
below:

10)

Right — so the
hanging
transaction
only
occurs
when
we
have
a
late
message
come in and the
partition
is
never
added
to
a
transaction
again.
If
we
never add the
partition
to
a
transaction,
we
will
never
write
a
marker
and
never advance the
LSO.

If we do end up
adding
the
partition
to
the
transaction
(I
suppose
this
can
happen before or
after
the
late
message
comes
in)
then
we
will
include
the
late message in the
next
(incorrect)
transaction.

So perhaps it is
clearer
to
make
the
distinction
between
messages
that
eventually get
added
to
the
transaction
(but
the
wrong
one)
or
messages
that never get
added
and
become
hanging.


20)

The client side
change
for
2 is
removing
the
addPartitions
to
transaction
call. We don't need
to
make
this
from
the
producer
to
the
txn
coordinator,
only server side.


In my opinion, the
issue
with
the
addPartitionsToTxn
call
for
older
clients
is that we don't
have
the
epoch
bump,
so
we
don't
know
if
the
message
belongs to the
previous
transaction or
this
one.
We
need
to
check
if
the
partition has been
added
to
this
transaction.
Of
course,
this
means
we
won't completely
cover
the
case
where
we
have a
really
late
message
and
we
have added the
partition
to
the new
transaction,
but
that's
unfortunately
something we will
need
the
new
clients
to
cover.


30)

Transaction is
ongoing
=
partition
was
added to
transaction
via
addPartitionsToTxn.
We
check
this
with
the
DescribeTransactions
call.
Let
me know if this
wasn't
sufficiently
explained
here:



























https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense#KIP890:TransactionsServerSideDefense-EnsureOngoingTransactionforOlderClients(3)


40)

The idea here is
that
if
any
messages
somehow
come
in
before
we
get
the
new
epoch to the
producer,
they
will be
fenced.
However,
if
we
don't
think
this
is necessary, it
can
be
discussed


50)

It should be
synchronous
because
if we
have
an
event
(ie,
an
error)
that
causes us to need
to
abort
the
transaction,
we
need
to
know
which
partitions to send
transaction
markers
to.
We
know
the
partitions
because
we added them to
the
coordinator
via
the
addPartitionsToTxn
call.
Previously we have
had
asynchronous
calls
in
the
past
(ie,
writing
the
commit markers when
the
transaction is
completed)
but
often
this
just
causes confusion as
we
need to
wait
for
some
operations
to
complete.
In
the
writing commit
markers
case,
clients
often
see

CONCURRENT_TRANSACTIONs
error messages and
that
can be
confusing.
For
that
reason,
it
may
be
simpler to just
have
synchronous
calls —
especially
if
we
need
to
block
on
some operation's
completion
anyway
before
we
can
start
the
next
transaction. And
yes, I
meant
coordinator. I
will
fix
that.


60)

When we are
checking
if
the
transaction
is
ongoing,
we
need
to
make
a
round
trip from the
leader
partition
to
the
transaction
coordinator.
In
the
time
we are waiting for
this
message to
come
back,
in
theory
we
could
have
sent
a commit/abort call
that
would
make
the
original
result
of
the
check
out
of
date. That is why
we
can
check
the
leader
state
before
we
write
to
the
log.


I'm happy to update
the
KIP if
some of
these
things
were
not
clear.
Thanks,
Justine

On Mon, Nov 21,
2022
at
7:11 PM
Matthias
J.
Sax <
mj...@apache.org

wrote:

Thanks for the
KIP.

Couple of
clarification
questions
(I
am
not a
broker
expert
do
maybe
some question are
obvious
for
others,
but
not
for
me
with
my
lack
of
broker knowledge).



(10)

The delayed
message
case
can
also
violate
EOS
if
the
delayed
message
comes in after the
next
addPartitionsToTxn
request
comes
in.
Effectively
we
may see a message
from a
previous
(aborted)
transaction
become
part
of
the
next transaction.

What happens if
the
message
come
in
before
the
next
addPartitionsToTxn
request? It seems
the
broker
hosting
the
data
partitions
won't
know
anything about it
and
append
it to
the
partition,
too?
What
is
the
difference between
both
cases?

Also, it seems a
TX
would
only
hang,
if
there
is no
following
TX
that
is
either committer
or
aborted?
Thus,
for
the
case
above,
the
TX
might
actually not hang
(of
course,
we
might
get
an
EOS
violation
if
the
first
TX was aborted and
the
second
committed,
or
the
other
way
around).


(20)

Of course, 1 and
2
require
client-side
changes, so
for
older
clients,
those approaches
won’t
apply.

For (1) I
understand
why a
client
change
is
necessary,
but
not
sure
why
we need a client
change
for
(2).
Can
you
elaborate?
--
Later
you
explain
that we should
send
a
DescribeTransactionRequest,
but I
am
not
sure
why?
Can't we not just
do
an
implicit
AddPartiitonToTx,
too?
If
the
old
producer correctly
registered
the
partition
already,
the
TX-coordinator
can just ignore it
as
it's an
idempotent
operation?


(30)

To cover older
clients,
we
will
ensure a
transaction
is
ongoing
before
we write to a
transaction

Not sure what you
mean
by
this?
Can
you
elaborate?


(40)

[the
TX-coordinator]
will
write
the
prepare
commit
message
with
a
bumped
epoch and send
WriteTxnMarkerRequests
with
the
bumped
epoch.

Why do we use the
bumped
epoch for
both?
It
seems
more
intuitive
to
use
the current epoch,
and
only
return
the
bumped
epoch
to
the
producer?


(50) "Implicit
AddPartitionToTransaction"

Why does the
implicitly
sent
request
need
to
be
synchronous?
The
KIP
also says

in case we need
to
abort
and
need to
know
which
partitions

What do you mean
by
this?


we don’t want to
write
to it
before
we
store
in
the
transaction
manager

Do you mean
TX-coordinator
instead of
"manager"?


(60)

For older clients
and
ensuring
that
the
TX
is
ongoing,
you
describe a
race condition. I
am
not
sure
if I
can
follow
here.
Can
you
elaborate?



-Matthias



On 11/18/22 1:21
PM,
Justine
Olshan
wrote:
Hey all!

I'd like to
start a
discussion
on my
proposal
to
add
some
server-side
checks on
transactions
to
avoid
hanging
transactions.
I
know
this
has
been
an issue for some
time,
so I
really
hope
this
KIP
will
be
helpful
for
many
users of EOS.

The KIP includes
changes
that
will
be
compatible
with
old
clients
and
changes to
improve
performance
and
correctness
on
new
clients.

Please take a
look
and
leave
any
comments
you
may
have!

KIP:




























https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense
JIRA:

https://issues.apache.org/jira/browse/KAFKA-14402

Thanks!
Justine






































Reply via email to