Thanks for the feedback, I created
https://issues.apache.org/jira/browse/KAFKA-14053 and started working on a
PR.

Luke, for the workaround, we used the transaction admin tool released in
3.0 to "abort" these hanging batches manually.
Naturally, the cluster health should be stabilized. This issue popped up
most frequently around times when some partitions went into a few minute
window of unavailability. The infinite retries on the producer side caused
a situation where the last retry was still in-flight, but the delivery
timeout was triggered on the client side. We reduced the retries and
increased the delivery timeout to avoid such situations.
Still, the issue can occur in other scenarios, for example a client
queueing up many batches in the producer buffer, and causing those batches
to spend most of the delivery timeout window in the client memory.

Thanks,
Daniel

Luke Chen <show...@gmail.com> ezt írta (időpont: 2022. júl. 7., Cs, 5:13):

> Hi Daniel,
>
> Thanks for reporting the issue, and the investigation.
> I'm curious, so, what's your workaround for this issue?
>
> I agree with Artem, it makes sense. Please file a bug in JIRA.
> And looking forward to your PR! :)
>
> Thank you.
> Luke
>
> On Thu, Jul 7, 2022 at 3:07 AM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Daniel,
> >
> > What you say makes sense.  Could you file a bug and put this info there
> so
> > that it's easier to track?
> >
> > -Artem
> >
> > On Wed, Jul 6, 2022 at 8:34 AM Dániel Urbán <urb.dani...@gmail.com>
> wrote:
> >
> > > Hello everyone,
> > >
> > > I've been investigating some transaction related issues in a very
> > > problematic cluster. Besides finding some interesting issues, I had
> some
> > > ideas about how transactional producer behavior could be improved.
> > >
> > > My suggestion in short is: when the transactional producer encounters
> an
> > > error which doesn't necessarily mean that the in-flight request was
> > > processed (for example a client side timeout), the producer should not
> > send
> > > an EndTxnRequest on abort, but instead it should bump the producer
> epoch.
> > >
> > > The long description about the issue I found, and how I came to the
> > > suggestion:
> > >
> > > First, the description of the issue. When I say that the cluster is
> "very
> > > problematic", I mean all kinds of different issues, be it infra (disks
> > and
> > > network) or throughput (high volume producers without fine tuning).
> > > In this cluster, Kafka transactions are widely used by many producers.
> > And
> > > in this cluster, partitions get "stuck" frequently (few times every
> > week).
> > >
> > > The exact meaning of a partition being "stuck" is this:
> > >
> > > On the client side:
> > > 1. A transactional producer sends X batches to a partition in a single
> > > transaction
> > > 2. Out of the X batches, the last few get sent, but are timed out
> thanks
> > to
> > > the delivery timeout config
> > > 3. producer.flush() is unblocked due to all batches being "finished"
> > > 4. Based on the errors reported in the producer.send() callback,
> > > producer.abortTransaction() is called
> > > 5. Then producer.close() is also invoked with a 5s timeout (this
> > > application does not reuse the producer instances optimally)
> > > 6. The transactional.id of the producer is never reused (it was random
> > > generated)
> > >
> > > On the partition leader side (what appears in the log segment of the
> > > partition):
> > > 1. The batches sent by the producer are all appended to the log
> > > 2. But the ABORT marker of the transaction was appended before the
> last 1
> > > or 2 batches of the transaction
> > >
> > > On the transaction coordinator side (what appears in the transaction
> > state
> > > partition):
> > > The transactional.id is present with the Empty state.
> > >
> > > These happenings result in the following:
> > > 1. The partition leader handles the first batch after the ABORT marker
> as
> > > the first message of a new transaction of the same producer id + epoch.
> > > (LSO is blocked at this point)
> > > 2. The transaction coordinator is not aware of any in-progress
> > transaction
> > > of the producer, thus never aborting the transaction, not even after
> the
> > > transaction.timeout.ms passes.
> > >
> > > This is happening with Kafka 2.5 running in the cluster, producer
> > versions
> > > range between 2.0 and 2.6.
> > > I scanned through a lot of tickets, and I believe that this issue is
> not
> > > specific to these versions, and could happen with newest versions as
> > well.
> > > If I'm mistaken, some pointers would be appreciated.
> > >
> > > Assuming that the issue could occur with any version, I believe this
> > issue
> > > boils down to one oversight on the client side:
> > > When a request fails without a definitive response (e.g. a delivery
> > > timeout), the client cannot assume that the request is "finished", and
> > > simply abort the transaction. If the request is still in flight, and
> the
> > > EndTxnRequest, then the WriteTxnMarkerRequest gets sent and processed
> > > earlier, the contract is violated by the client.
> > > This could be avoided by providing more information to the partition
> > > leader. Right now, a new transactional batch signals the start of a new
> > > transaction, and there is no way for the partition leader to decide
> > whether
> > > the batch is an out-of-order message.
> > > In a naive and wasteful protocol, we could have a unique transaction id
> > > added to each batch and marker, meaning that the leader would be
> capable
> > of
> > > refusing batches which arrive after the control marker of the
> > transaction.
> > > But instead of changing the log format and the protocol, we can achieve
> > the
> > > same by bumping the producer epoch.
> > >
> > > Bumping the epoch has a similar effect to "changing the transaction
> id" -
> > > the in-progress transaction will be aborted with a bumped producer
> epoch,
> > > telling the partition leader about the producer epoch change. From this
> > > point on, any batches sent with the old epoch will be refused by the
> > leader
> > > due to the fencing mechanism. It doesn't really matter how many batches
> > > will get appended to the log, and how many will be refused - this is an
> > > aborted transaction - but the out-of-order message cannot occur, and
> > cannot
> > > block the LSO infinitely.
> > >
> > > My suggestion is, that the TransactionManager inside the producer
> should
> > > keep track of what type of errors were encountered by the batches of
> the
> > > transaction, and categorize them along the lines of "definitely
> > completed"
> > > and "might not be completed". When the transaction goes into an
> abortable
> > > state, and there is at least one batch with "might not be completed",
> the
> > > EndTxnRequest should be skipped, and an epoch bump should be sent.
> > > As for what type of error counts as "might not be completed", I can
> only
> > > think of client side timeouts.
> > >
> > > I believe this is a relatively small change (only affects the client
> > lib),
> > > but it helps in avoiding some corrupt states in Kafka transactions.
> > >
> > > Looking forward to your input. If it seems like a sane idea, I go ahead
> > and
> > > submit a PR for it as well.
> > >
> > > Thanks in advance,
> > > Daniel
> > >
> >
>

Reply via email to