Hi,
Sorry for the delay in responding.

I was interested to see how you attacked the problem of acknowledging 
transactionally. The direction is definitely along the right lines.

The tricky part of this is that the KafkaProducer is transactional but 
KafkaShareConsumer is not. For regular consumers, there is the similar 
distinction: KafkaProducer is transactional but KafkaConsumer is not. So, there 
are methods in KafkaProducer to add offsets to transactions, but the 
KafkaConsumer is entirely unaware of the use of transactions. KafkaConsumer 
does not keep much state beyond the current position of its assigned 
partitions, and the correct behaviour for handling the consumer when a 
transaction fails is to throw it away and make a new one. Even so, we find that 
developers who write transactional applications which use KafkaProducer and 
KafkaConsumer together don't always get the error handling right.

KafkaShareConsumer maintains a lot more state such as the acknowledgements map 
and updates to this need to be coordinated as part of the transaction. Also, 
the acknowledgements will require both the group ID and member ID to be sent to 
the broker.

I think there are two broad ways to achieve this:

a) We could make KafkaShareConsumer transactional in its own right. This is 
probably a bit controversial in terms of Kafka APIs. The code might look like 
this:

shareConsumer.beginTransaction()
shareConsumer.acknowledge(inputRecord)
shareConsumer.reply("outputTopic", outputRecord)
shareConsumer.commitTransaction()

It's basically introduced a very simple producer capability into the share 
consumer, and also made the share consumer transactional. Of course, in the 
no-producer case you described, the controversial reply() method would not be 
required.

b) We could somehow get KafkaProducer to call KafkaShareConsumer at transaction 
end in order to update the state correctly.

I think what you've done in the sequence diagrams is along the right lines. I 
think we need the KafkaShareConsumer to send the AddShareAcksToTxn to the 
share-partition leader on the instructions of the KafkaProducer, or at least 
assist in the building of it. Otherwise, the record states in the 
share-partition leader will not be updated and the record acquisition locks 
could time out.

Maybe the code for your CTP use case would look like the following:

producer.beginTransaction()
producer.send(outputRecord)
producer.sendShareAcknowledgementsToTransaction(shareConsumer)
producer.commitTransaction()

Or perhaps:

producer.beginTransaction()
producer.send(outputRecord)
txnAckData = shareConsumer.prepareAcknowledgementsForTransaction()
producer.sendShareAcknowledgementsToTransaction(txnAckData)
producer.commitTransaction()

The sendShareAcknowledgementsToTransaction method calls a method on the share 
consumer providing the transactional ID, producer ID and producer epoch which 
will be needed to participate in the Kafka transaction protocol. The share 
consumer would then call the share-partition leader, move the acknowledged 
records into a committing state, and then the SPL would call the share 
coordinator with a transactional variant of WriteShareGroupState.

Then the transaction coordinator would call the share coordinator during 
transaction commit to commit the transactional updates to the share-group state.


This is extending the transactional capabilities of Kafka quite considerably. I 
think the best path here is to flesh out the second of these options further. 
Let me know whether you'd like me to help with the next revision of the KIP.

Thanks,
Andrew

On 2026/03/01 17:39:38 Shekhar Prasad Rajak via dev wrote:
> Thank you Andrew for the review.
> It took sometime to analyse the review comments and understand Kafka Producer 
> 2PC implementation in depth.All the points are strongly valid and I have 
> updated the KIP to adopt the RPC already exists and leverage the same methods 
> for , CTP pattern  (Consume-Transform-Produce) for kafka as sink or no 
> producer usecase.
> Please have a look: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1289+Support+Hierarchical+Transactional+Acknowledgments+for+Share+Groups
>  
> 
> 
> 
> Regards,Shekhar Prasad Rajak,
> 
> 
> 
> 
>  
> 
>     On Thursday 26 February 2026 at 03:32:36 am GMT+5:30, Andrew Schofield 
> <[email protected]> wrote:  
>  
>  Hi Shekhar,
> Thanks for the KIP. Adding transactional support for acknowledgements is one 
> of the future features I'd like to see for share groups, so I'm glad you've 
> started this.
> 
> Here are some comments from an initial read.
> 
> AS1: Previously, Kafka transactions have always been a producer concept. You 
> set a configuration called `transactional.id` in the producer configuration 
> which gives authorization checking and helps with fencing. Then you use 
> KafkaProducer methods like beginTransaction and commitTransaction to mark the 
> start and end of the transaction. Only the producer is currently enabled for 
> transactional operation, and the Kafka RPCs it uses include transactional ID 
> as necessary to indicate operations which are transactional.
> 
> There will be differences for sure with share groups, but the overall model 
> should probably still hold. Over the years, we've gradually tightened up the 
> transaction protocol by adding features such as the producer epoch and 
> two-phase commit. I would rather not re-invent everything. For example, 
> WriteTxnMarkers is used by the transaction coordinator to tell the group 
> coordinator to complete transactional operations on the __consumer_offsets 
> topic. I would expect a similar kind of interaction with the share 
> coordinator on the __share_group_state topic.
> 
> I suggest looking at how to adapt the existing Kafka protocol RPCs rather 
> than defining new ones.
> 
> AS2: Why is there an acknowledge(ConsumerRecord<,>, AcknowledgeType, String) 
> method? I would have expected that the transactional ID is a configuration, 
> not a parameter of this method.
> 
> AS3: Have you considered how to write an application which uses a share group 
> and producer, and coordinates their operations in the same transaction?
> 
> 
> Thanks,
> Andrew
> 
> On 2026/02/24 08:54:36 Shekhar Prasad Rajak via dev wrote:
> > Hello team,
> > In this discussion thread we will finalise the design for the KIP-1289 
> > Support Hierarchical Transactional Acknowledgments for Share Groups. 
> > 
> > Share Groups currently support only immediate acknowledgement modes 
> > (IMPLICIT/EXPLICIT). This causes data loss in distributed streaming 
> > frameworks:
> > 
> > 1. Worker acknowledges records → Records removed from Kafka
> > 2. Checkpoint fails before sink write
> > 3. Records lost (acknowledged but never persisted)
> > 
> > Goal: Enable exactly-once read semantics via transactional acknowledgements.
> > We already have similar pattern in Kafka producer transaction
> > 
> > KIP: KIP-1289 Support Hierarchical Transactional Acknowledgments for Share 
> > Groups - Apache Kafka - Apache Software Foundation 
> > 
> > Looking forward to community's feedback! 
> > 
> > 
> > Regards,Shekhar Prasad Rajak
> > 
> > 
>   

Reply via email to