Hello everyone,

I've been investigating some transaction related issues in a very
problematic cluster. Besides finding some interesting issues, I had some
ideas about how transactional producer behavior could be improved.

My suggestion in short is: when the transactional producer encounters an
error which doesn't necessarily mean that the in-flight request was
processed (for example a client side timeout), the producer should not send
an EndTxnRequest on abort, but instead it should bump the producer epoch.

The long description about the issue I found, and how I came to the
suggestion:

First, the description of the issue. When I say that the cluster is "very
problematic", I mean all kinds of different issues, be it infra (disks and
network) or throughput (high volume producers without fine tuning).
In this cluster, Kafka transactions are widely used by many producers. And
in this cluster, partitions get "stuck" frequently (few times every week).

The exact meaning of a partition being "stuck" is this:

On the client side:
1. A transactional producer sends X batches to a partition in a single
transaction
2. Out of the X batches, the last few get sent, but are timed out thanks to
the delivery timeout config
3. producer.flush() is unblocked due to all batches being "finished"
4. Based on the errors reported in the producer.send() callback,
producer.abortTransaction() is called
5. Then producer.close() is also invoked with a 5s timeout (this
application does not reuse the producer instances optimally)
6. The transactional.id of the producer is never reused (it was random
generated)

On the partition leader side (what appears in the log segment of the
partition):
1. The batches sent by the producer are all appended to the log
2. But the ABORT marker of the transaction was appended before the last 1
or 2 batches of the transaction

On the transaction coordinator side (what appears in the transaction state
partition):
The transactional.id is present with the Empty state.

These happenings result in the following:
1. The partition leader handles the first batch after the ABORT marker as
the first message of a new transaction of the same producer id + epoch.
(LSO is blocked at this point)
2. The transaction coordinator is not aware of any in-progress transaction
of the producer, thus never aborting the transaction, not even after the
transaction.timeout.ms passes.

This is happening with Kafka 2.5 running in the cluster, producer versions
range between 2.0 and 2.6.
I scanned through a lot of tickets, and I believe that this issue is not
specific to these versions, and could happen with newest versions as well.
If I'm mistaken, some pointers would be appreciated.

Assuming that the issue could occur with any version, I believe this issue
boils down to one oversight on the client side:
When a request fails without a definitive response (e.g. a delivery
timeout), the client cannot assume that the request is "finished", and
simply abort the transaction. If the request is still in flight, and the
EndTxnRequest, then the WriteTxnMarkerRequest gets sent and processed
earlier, the contract is violated by the client.
This could be avoided by providing more information to the partition
leader. Right now, a new transactional batch signals the start of a new
transaction, and there is no way for the partition leader to decide whether
the batch is an out-of-order message.
In a naive and wasteful protocol, we could have a unique transaction id
added to each batch and marker, meaning that the leader would be capable of
refusing batches which arrive after the control marker of the transaction.
But instead of changing the log format and the protocol, we can achieve the
same by bumping the producer epoch.

Bumping the epoch has a similar effect to "changing the transaction id" -
the in-progress transaction will be aborted with a bumped producer epoch,
telling the partition leader about the producer epoch change. From this
point on, any batches sent with the old epoch will be refused by the leader
due to the fencing mechanism. It doesn't really matter how many batches
will get appended to the log, and how many will be refused - this is an
aborted transaction - but the out-of-order message cannot occur, and cannot
block the LSO infinitely.

My suggestion is, that the TransactionManager inside the producer should
keep track of what type of errors were encountered by the batches of the
transaction, and categorize them along the lines of "definitely completed"
and "might not be completed". When the transaction goes into an abortable
state, and there is at least one batch with "might not be completed", the
EndTxnRequest should be skipped, and an epoch bump should be sent.
As for what type of error counts as "might not be completed", I can only
think of client side timeouts.

I believe this is a relatively small change (only affects the client lib),
but it helps in avoiding some corrupt states in Kafka transactions.

Looking forward to your input. If it seems like a sane idea, I go ahead and
submit a PR for it as well.

Thanks in advance,
Daniel

Reply via email to