Danil Shkodin created KAFKA-18673:
-------------------------------------
Summary: Provide means to gracefully update Producer transational
id mapping state in case of lasting inactivity
Key: KAFKA-18673
URL: https://issues.apache.org/jira/browse/KAFKA-18673
Project: Kafka
Issue Type: Improvement
Reporter: Danil Shkodin
Consider adding some method of preventing a transactional _Producer_ instance
from expiring, please.
For ones that run services 24/7 that write transactional messages to Kafka very
sparsely there are several options to keep the program highly available.
The first being what Spring
[does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
rotating transactional producers at intervals lower than the expiration
timeout.
{code:java}
void fixTransactionalIdExpiration() {
try {
producer.close(timeout);
} catch (Exception error) {
logger.warn("...", error);
}
producer = null;
try {
producer = new KafkaProducer<>(settings);
} catch (Exception error) {
logger.warn("...", error);
// handle failure
return;
}
try {
producer.initTransactions();
} catch (Exception error) {
logger.warn("...", error);
// close producer and clean up, handle failure
return;
}
}{code}
The other similar one is to also act periodically, but to just write an empty
record transactionally instead of reconnecting.
{code:java}
void fixTransactionalIdExpiration() {
try {
producer.beginTransaction();
var topic = "project_prefix.__dummy_topic";
var message = new ProducerRecord<>(topic, (String) null, (String) null);
producer.send(message);
producer.abortTransaction();
// or producer.commitTransaction(); does not matter
} catch (Exception error) {
logger.warn("...", error);
// handle failure
}
}{code}
Personally, I do not like the necessity of introducing a service topic. This
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration.
[Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
there is a guard in the transactional manager that would prevent actual
updates to the transactinal producer mappings if there is nothing to write.
{code:java}
void fixTransactionalIdExpiration() {
// does not work
// producer will still get fenced upon transactional.id.expiration.ms
try {
producer.beginTransaction();
producer.commitTransaction();
} catch (Exception error) {}
} {code}
Worth noting that client code may execute one of these periodic fixes
conditionally, only if there was no activity, meaning there were no successful
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.
The last and obvious one is to let it fail and react to the error.
{code:java}
void sendMessage(Message message) {
try {
producer.beginTransaction();
producer.send(message.to());
producer.commitTransaction();
} catch (InvalidPidMappingException error) {
// reconnect, retry
} catch (Exception error) {
// handle failure
}
}{code}
Having a dedicated method that explicitly reflects the intent to refresh
producer _transactional.id_ would line up with the _Consumer_ polling mechanism
and manifest to new kafka-clients users that lasting transactional _Producer_
inactivity should be addressed.
This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which
is not currently assigned to its transactional id
transactional.id.expiration.ms
--
This message was sent by Atlassian Jira
(v8.20.10#820010)