[
https://issues.apache.org/jira/browse/KAFKA-18673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Tran reassigned KAFKA-18673:
---------------------------------
Assignee: Alex Tran
> Provide means to gracefully update Producer transational id mapping state in
> case of lasting inactivity
> -------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-18673
> URL: https://issues.apache.org/jira/browse/KAFKA-18673
> Project: Kafka
> Issue Type: Improvement
> Reporter: Danil Shkodin
> Assignee: Alex Tran
> Priority: Major
>
> Consider adding some method of preventing a transactional _Producer_ instance
> from expiring, please.
> Currently, for ones that run services 24/7 that write transactional messages
> to Kafka very sparsely there are several options to keep the program highly
> available.
> The first being what Spring
> [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
> rotating transactional producers at intervals lower than the expiration
> timeout.
> {code:java}
> void fixTransactionalIdExpiration() {
> try {
> producer.close(timeout);
> } catch (Exception error) {
> logger.warn("...", error);
> }
> producer = null;
> try {
> producer = new KafkaProducer<>(settings);
> } catch (Exception error) {
> logger.warn("...", error);
> // handle failure
> return;
> }
> try {
> producer.initTransactions();
> } catch (Exception error) {
> logger.warn("...", error);
> // close producer and clean up, handle failure
> return;
> }
> }{code}
> The other similar one is to also act periodically, but to just write an empty
> record transactionally instead of reconnecting.
> {code:java}
> void fixTransactionalIdExpiration() {
> try {
> producer.beginTransaction();
> var topic = "project_prefix.__dummy_topic";
> var message = new ProducerRecord<>(topic, (String) null, (String) null);
> producer.send(message);
> producer.abortTransaction();
> // or producer.commitTransaction(); does not matter
> } catch (Exception error) {
> logger.warn("...", error);
> // handle failure
> }
> }{code}
> Personally, I do not like the necessity of introducing a service topic. This
> inelegance overweights reconnection troubles for me.
> Suprisingly, producing an empty transaction does not prevent expiration.
> [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
> there is a guard in the transactional manager that would prevent actual
> updates to the transactinal producer mappings if there is nothing to write.
> {code:java}
> void fixTransactionalIdExpiration() {
> // does not work
> // producer will still get fenced upon transactional.id.expiration.ms
> try {
> producer.beginTransaction();
> producer.commitTransaction();
> } catch (Exception error) {}
> } {code}
> Worth noting that client code may execute one of these periodic fixes
> conditionally, only if there was no activity, meaning there were no
> successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.
> The last and obvious one is to let it fail and react to the error.
> {code:java}
> void sendMessage(Message message) {
> try {
> producer.beginTransaction();
> producer.send(message.to());
> producer.commitTransaction();
> } catch (InvalidPidMappingException error) {
> // reconnect, retry
> } catch (Exception error) {
> // handle failure
> }
> }{code}
> Having a dedicated method that explicitly reflects the intent to refresh
> producer _transactional.id_ would line up with the _Consumer_ polling
> mechanism and manifest to new kafka-clients users that lasting transactional
> _Producer_ inactivity should be addressed.
> This issue search optimization:
> InvalidPidMappingException: The producer attempted to use a producer id which
> is not currently assigned to its transactional id
> transactional.id.expiration.ms
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)