Hey Matthias -- I'm quite sure you should  *not* do what we're doing in
https://github.com/apache/kafka/pull/15315. That's definitely a bad hack,
and IIUC the only reason we accepted it was because the choice was between
implementing a hacky temporary fix and blocking the entire release while we
figured out the "right" way to fix this. Sadly, it seems like we never
followed up after the release was cut, and AFAICT everyone forgot about
this so-called "temporary" hack.... as predicted
<https://github.com/apache/kafka/pull/15315#discussion_r1478629131>...

Anyways, as a general rule, you should never try to catch and swallow an
IllegalStateException. You never know if you might be swallowing an actual
problem and messing up your application. It's better to just make sure it
doesn't get thrown in the first place.

In this specific case, it seems like the "illegal state" that both you and
Kafka Streams are hitting comes from calling #abortTransaction after a
timeout. So it sounds to me like you should just add a separate catch block
for TimeoutException and retry the #commitTransaction.

For the full explanation as to why you need to retry the #commitTxn (as
opposed to just starting a new transaction), check out this paragraph in
the #commitTransaction javadocs:

Note that this method will raise TimeoutException if the transaction cannot
> be committed before expiration of max.block.ms, but this does not mean
> the request did not actually reach the broker. In fact, it only indicates
> that we cannot get the acknowledgement response in time, so it's up to the
> application's logic to decide how to handle timeouts. Additionally, it will
> raise InterruptException if interrupted. It is safe to retry in either
> case, but it is not possible to attempt a different operation (such as
> abortTransaction) since the commit may already be in the progress of
> completing. If not retrying, the only option is to close the producer.


It does seem like the exception handling example is objectively incorrect,
since TimeoutException extends KafkaException and will, as you experienced,
end up in the KafkaException catch block that tries to abort the exception.
I get that people might want to handle timeouts differently, and it's
tricky since they can also be thrown from abortTransaction and has to be
handled in the same way (ie you can only retry the same API or close the
producer). But the example is very misleading and should absolutely be
updated to include explicit TimeoutException handling in my opinion.

Would you be interested in doing a PR to fix the Producer javadcos and
improve the exception handling example?

On Thu, May 2, 2024 at 4:22 AM <matthias.kr...@gmx.de.invalid> wrote:

> Hi,
>
> Thanks for your work and sorry to bother you.
>
> My code gets the same IllegalStateException from KafkaProducer as Kafka
> Stream gets in KAFKA-16221:
>
> java.lang.IllegalStateException: Cannot attempt operation
> `abortTransaction` because the previous call to `commitTransaction` timed
> out and must be retried
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109)
>     at
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266)
>     at
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835)
>
> I have followed the recipe from
> https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> for error handling for a transactional producer, i.e.
>
>  try {
>      producer.beginTransaction();
>      for (int i = 0; i < 100; i++)
>          producer.send(new ProducerRecord<>("my-topic",
> Integer.toString(i), Integer.toString(i)));
>      producer.commitTransaction();
>  } catch (ProducerFencedException | OutOfOrderSequenceException |
> AuthorizationException e) {
>      // We can't recover from these exceptions, so our only option is to
> close the producer and exit.
>      producer.close();
>  } catch (KafkaException e) {
>      // For all other exceptions, just abort the transaction and try again.
>      producer.abortTransaction();
>  }
>  producer.close();
>
> Kafka Streams has solved KAFKA-16221 by introducing a hack (
> https://github.com/apache/kafka/pull/15315), but plans a clean solution.
>
> Does that mean that above recipe is outdated?
> Is there really no simple, clean solution how to do the error handling?
> Should I use the solution from https://github.com/apache/kafka/pull/15315
> and wait for what Kafka Streams comes up next for the clean solution?
>
> Kind regards, Matthias Kraaz
>
>

Reply via email to