[ 
https://issues.apache.org/jira/browse/KAFKA-18673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Tran reassigned KAFKA-18673:
---------------------------------

    Assignee: Alex Tran

> Provide means to gracefully update Producer transational id mapping state in 
> case of lasting inactivity
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-18673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18673
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Danil Shkodin
>            Assignee: Alex Tran
>            Priority: Major
>
> Consider adding some method of preventing a transactional _Producer_ instance 
> from expiring, please.
> Currently, for ones that run services 24/7 that write transactional messages 
> to Kafka very sparsely there are several options to keep the program highly 
> available.
> The first being what Spring  
> [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
>  rotating transactional producers at intervals lower than the expiration 
> timeout.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.close(timeout);
>   } catch (Exception error) {
>     logger.warn("...", error);
>   }
>   producer = null;
>   try {
>     producer = new KafkaProducer<>(settings);
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>     return;
>   }
>   try {
>     producer.initTransactions();
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // close producer and clean up, handle failure
>     return;
>   }
> }{code}
> The other similar one is to also act periodically, but to just write an empty 
> record transactionally instead of reconnecting.
> {code:java}
> void fixTransactionalIdExpiration() {
>   try {
>     producer.beginTransaction();
>     var topic = "project_prefix.__dummy_topic";
>     var message = new ProducerRecord<>(topic, (String) null, (String) null);
>     producer.send(message);
>     producer.abortTransaction();
>     // or producer.commitTransaction(); does not matter
>   } catch (Exception error) {
>     logger.warn("...", error);
>     // handle failure
>   }
> }{code}
> Personally, I do not like the necessity of introducing a service topic. This 
> inelegance overweights reconnection troubles for me.
> Suprisingly, producing an empty transaction does not prevent expiration. 
> [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
>  there is a guard in the transactional manager that would prevent actual 
> updates to the transactinal producer mappings if there is nothing to write.
> {code:java}
> void fixTransactionalIdExpiration() {
>   // does not work
>   // producer will still get fenced upon transactional.id.expiration.ms
>   try {
>     producer.beginTransaction();
>     producer.commitTransaction();
>   } catch (Exception error) {}
> } {code}
> Worth noting that client code may execute one of these periodic fixes 
> conditionally, only if there was no activity, meaning there were no 
> successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.
> The last and obvious one is to let it fail and react to the error.
> {code:java}
> void sendMessage(Message message) {
>   try {
>     producer.beginTransaction();
>     producer.send(message.to());
>     producer.commitTransaction();
>   } catch (InvalidPidMappingException error) {
>     // reconnect, retry
>   } catch (Exception error) {
>     // handle failure
>   }
> }{code}
> Having a dedicated method that explicitly reflects the intent to refresh 
> producer _transactional.id_ would line up with the _Consumer_ polling 
> mechanism and manifest to new kafka-clients users that lasting transactional 
> _Producer_ inactivity should be addressed.
> This issue search optimization:
> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> transactional.id.expiration.ms
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to